• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kimata / my-py-lib / 21092969449

17 Jan 2026 10:39AM UTC coverage: 62.974% (+2.0%) from 60.961%
21092969449

push

github

kimata
refactor: 型安全性向上とコード品質改善(第4弾)

## 主な変更

### センサー
- ADS1015/ADS1115 の重複コードを ads_base.py に統合
- センサー系の型注釈改善

### json_util.py
- iso_pattern の重複定義をモジュールレベル定数に統一
- DateTimeJSONEncoder.default() の type: ignore を削除

### store/mercari
- MercariItem dataclass を導入し、item 辞書を型安全に
- ProgressObserver Protocol の型定義を MercariItem に変更

### store/amazon
- AmazonItem.to_dict() を dataclasses.asdict() で簡潔化

### その他
- flask_util.py の type: ignore を cast() に置換
- lifecycle_manager.py を削除(lifecycle/manager.py に移行済み)
- 各種型注釈・docstring の改善

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

191 of 246 new or added lines in 29 files covered. (77.64%)

112 existing lines in 12 files now uncovered.

3439 of 5461 relevant lines covered (62.97%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.08
/src/my_lib/sensor_data.py
1
#!/usr/bin/env python3
2
"""
3
InfluxDB からデータを取得します。
4

5
Usage:
6
  sensor_data.py [-c CONFIG] [-m MODE] [-i DB_SPEC] [-s SENSOR_SPEC] [-f FIELD] [-e EVERY] [-w WINDOW]
7
                 [-p HOURS] [-D]
8

9
Options:
10
  -c CONFIG         : CONFIG を設定ファイルとして読み込んで実行します。
11
                      [default: tests/fixtures/config.example.yaml]
12
  -m MODE           : データ取得モード。(data, day_sum, hour_sum, minute_sum のいずれか) [default: data]
13
  -i DB_SPEC        : 設定ファイルの中で InfluxDB の設定が書かれているパス。[default: sensor.influxdb]
14
  -s SENSOR_SPEC    : 設定ファイルの中で取得対象のデータの設定が書かれているパス。[default: sensor.lux]
15
  -f FIELD          : 取得するフィールド。[default: lux]
16
  -e EVERY          : 何分ごとのデータを取得するか。[default: 1]
17
  -w WINDOWE        : 算出に使うウィンドウ。[default: 5]
18
  -p PERIOD         : 積算(sum)モードの場合に、過去どのくらいの分を取得するか。[default: 1]
19
  -D                : デバッグモードで動作します。
20
"""
21

22
from __future__ import annotations
1✔
23

24
import asyncio
1✔
25
import datetime
1✔
26
import logging
1✔
27
import os
1✔
28
import time
1✔
29
from dataclasses import dataclass, field
1✔
30
from typing import Any, Self
1✔
31

32

33
@dataclass(frozen=True)
1✔
34
class InfluxDBConfig:
1✔
35
    """InfluxDB 接続設定"""
36

37
    url: str
1✔
38
    token: str
1✔
39
    org: str
1✔
40
    bucket: str
1✔
41

42
    @classmethod
1✔
43
    def parse(cls, data: dict[str, Any]) -> Self:
1✔
NEW
44
        return cls(
×
45
            url=data["url"],
46
            token=data["token"],
47
            org=data["org"],
48
            bucket=data["bucket"],
49
        )
50

51

52
import influxdb_client  # noqa: E402
1✔
53
from influxdb_client.client.flux_table import TableList  # noqa: E402
1✔
54

55
import my_lib.time  # noqa: E402
1✔
56

57

58
@dataclass(frozen=True)
1✔
59
class SensorDataResult:
1✔
60
    """センサーデータ取得結果
61

62
    Attributes:
63
        value: センサー値のリスト
64
        time: タイムスタンプのリスト
65
        valid: データが有効かどうか
66
        raw_record_count: 取得した生レコード数(処理前)
67
        null_count: None だったレコード数
68
        error_message: エラー発生時のメッセージ
69
    """
70

71
    value: list[float] = field(default_factory=list)
1✔
72
    time: list[datetime.datetime] = field(default_factory=list)
1✔
73
    valid: bool = False
1✔
74
    raw_record_count: int = 0
1✔
75
    null_count: int = 0
1✔
76
    error_message: str | None = None
1✔
77

78
    def get_diagnostic_message(self) -> str:
1✔
79
        """診断メッセージを生成"""
80
        if self.error_message:
×
81
            return f"接続エラー: {self.error_message}"
×
82
        if self.raw_record_count == 0:
×
83
            return "データなし: クエリ結果が空でした"
×
84
        if self.null_count == self.raw_record_count:
×
85
            return f"全データがNone: {self.raw_record_count}件すべてがNoneでした"
×
86
        if self.null_count > 0:
×
87
            return (
×
88
                f"一部データがNone: {self.raw_record_count}件中{self.null_count}件がNone、"
89
                f"有効データ{len(self.value)}件"
90
            )
91
        return f"データ取得成功: {len(self.value)}件"
×
92

93

94
@dataclass(frozen=True)
1✔
95
class DataRequest:
1✔
96
    """センサーデータ取得リクエスト"""
97

98
    measure: str
1✔
99
    hostname: str
1✔
100
    field: str
1✔
101
    start: str = "-30h"
1✔
102
    stop: str = "now()"
1✔
103
    every_min: int = 1
1✔
104
    window_min: int = 3
1✔
105
    create_empty: bool = True
1✔
106
    last: bool = False
1✔
107

108

109
# NOTE: データが欠損している期間も含めてデータを敷き詰めるため、
110
# timedMovingAverage を使う。timedMovingAverage の計算の結果、データが後ろに
111
# ずれるので、あらかじめ offset を使って前にずらしておく。
112
FLUX_QUERY = """
1✔
113
from(bucket: "{bucket}")
114
|> range(start: {start}, stop: {stop})
115
    |> filter(fn:(r) => r._measurement == "{measure}")
116
    |> filter(fn: (r) => r.hostname == "{hostname}")
117
    |> filter(fn: (r) => r["_field"] == "{field}")
118
    |> aggregateWindow(every: {window}m, offset:-{window}m, fn: mean, createEmpty: {create_empty})
119
    |> fill(usePrevious: true)
120
    |> timedMovingAverage(every: {every}m, period: {window}m)
121
"""
122

123
FLUX_QUERY_WITHOUT_AGGREGATION = """
1✔
124
from(bucket: "{bucket}")
125
|> range(start: {start}, stop: {stop})
126
    |> filter(fn:(r) => r._measurement == "{measure}")
127
    |> filter(fn: (r) => r.hostname == "{hostname}")
128
    |> filter(fn: (r) => r["_field"] == "{field}")
129
    |> fill(usePrevious: true)
130
"""
131

132
FLUX_SUM_QUERY = """
1✔
133
from(bucket: "{bucket}")
134
    |> range(start: {start}, stop: {stop})
135
    |> filter(fn:(r) => r._measurement == "{measure}")
136
    |> filter(fn: (r) => r.hostname == "{hostname}")
137
    |> filter(fn: (r) => r["_field"] == "{field}")
138
    |> aggregateWindow(every: {every}m, offset:-{every}m, fn: mean, createEmpty: {create_empty})
139
    |> filter(fn: (r) => exists r._value)
140
    |> fill(usePrevious: true)
141
    |> reduce(
142
        fn: (r, accumulator) => ({{sum: r._value + accumulator.sum, count: accumulator.count + 1}}),
143
        identity: {{sum: 0.0, count: 0}},
144
    )
145
"""
146

147
FLUX_EVENT_QUERY = """
1✔
148
from(bucket: "{bucket}")
149
    |> range(start: {start})
150
    |> filter(fn: (r) => r._measurement == "{measure}")
151
    |> filter(fn: (r) => r.hostname == "{hostname}")
152
    |> filter(fn: (r) => r["_field"] == "{field}")
153
    |> map(fn: (r) => ({{ r with _value: if r._value then 1 else 0 }}))
154
    |> difference()
155
    |> filter(fn: (r) => r._value == 1)
156
    |> sort(columns: ["_time"], desc: true)
157
    |> limit(n: 1)
158
"""
159

160

161
def _process_query_results(
1✔
162
    table_list: list[Any], create_empty: bool, last: bool, every_min: int, window_min: int
163
) -> SensorDataResult:
164
    """共通のクエリ結果処理ロジック"""
165
    data_list = []
1✔
166
    time_list = []
1✔
167
    localtime_offset = datetime.timedelta(hours=9)
1✔
168

169
    # 診断情報
170
    raw_record_count = 0
1✔
171
    null_count = 0
1✔
172

173
    if len(table_list) != 0:
1✔
174
        raw_record_count = len(table_list[0].records)
1✔
175
        for record in table_list[0].records:
1✔
176
            # NOTE: aggregateWindow(createEmpty: true) と fill(usePrevious: true) の組み合わせ
177
            # だとタイミングによって、先頭に None が入る
178
            if record.get_value() is None:
1✔
179
                logging.debug("DELETE %s", record.get_time() + localtime_offset)
1✔
180
                null_count += 1
1✔
181
                continue
1✔
182

183
            data_list.append(record.get_value())
1✔
184
            time_list.append(record.get_time() + localtime_offset)
1✔
185

186
    if create_empty and not last:
1✔
187
        # NOTE: aggregateWindow(createEmpty: true) と timedMovingAverage を使うと、
188
        # 末尾に余分なデータが入るので取り除く
189
        every_min = int(every_min)
1✔
190
        window_min = int(window_min)
1✔
191
        if window_min > every_min:
1✔
192
            trim_count = window_min - every_min
1✔
193
            # データが十分にある場合のみ切り詰め
194
            if len(data_list) > trim_count:
1✔
195
                data_list = data_list[:-trim_count]
1✔
196
                time_list = time_list[:-trim_count]
1✔
197
            else:
198
                logging.warning(
1✔
199
                    "Insufficient data to trim: data_count=%d, trim_count=%d",
200
                    len(data_list),
201
                    trim_count,
202
                )
203

204
    logging.debug("data count = %s", len(time_list))
1✔
205
    return SensorDataResult(
1✔
206
        value=data_list,
207
        time=time_list,
208
        valid=len(time_list) != 0,
209
        raw_record_count=raw_record_count,
210
        null_count=null_count,
211
    )
212

213

214
def _fetch_data_impl(
1✔
215
    db_config: InfluxDBConfig,
216
    template: str,
217
    measure: str,
218
    hostname: str,
219
    field: str,
220
    start: str,
221
    stop: str,
222
    every: int,
223
    window: int,
224
    create_empty: bool,
225
    last: bool = False,
226
) -> TableList:
227
    client = None
×
228
    try:
×
NEW
229
        token = os.environ.get("INFLUXDB_TOKEN", db_config.token)
×
230

231
        query = template.format(
×
232
            bucket=db_config.bucket,
233
            measure=measure,
234
            hostname=hostname,
235
            field=field,
236
            start=start,
237
            stop=stop,
238
            every=every,
239
            window=window,
240
            create_empty=str(create_empty).lower(),
241
        )
242
        if last:
×
243
            query += " |> last()"
×
244

245
        logging.debug("Flux query = %s", query)
×
NEW
246
        client = influxdb_client.InfluxDBClient(url=db_config.url, token=token, org=db_config.org)  # type: ignore[reportPrivateImportUsage]
×
UNCOV
247
        query_api = client.query_api()
×
248

249
        return query_api.query(query=query)
×
250
    except Exception:
×
251
        logging.exception("Failed to fetch data")
×
252
        raise
×
253
    finally:
254
        if client is not None:
×
255
            client.close()
×
256

257

258
async def _fetch_data_impl_async(
1✔
259
    db_config: InfluxDBConfig,
260
    template: str,
261
    measure: str,
262
    hostname: str,
263
    field: str,
264
    start: str,
265
    stop: str,
266
    every: int,
267
    window: int,
268
    create_empty: bool,
269
    last: bool = False,
270
) -> TableList:
271
    """非同期版のデータ取得実装"""
272
    loop = asyncio.get_event_loop()
×
273
    return await loop.run_in_executor(
×
274
        None,
275
        _fetch_data_impl,
276
        db_config,
277
        template,
278
        measure,
279
        hostname,
280
        field,
281
        start,
282
        stop,
283
        every,
284
        window,
285
        create_empty,
286
        last,
287
    )
288

289

290
def fetch_data(
1✔
291
    db_config: InfluxDBConfig,
292
    measure: str,
293
    hostname: str,
294
    field: str,
295
    start: str = "-30h",
296
    stop: str = "now()",
297
    every_min: int = 1,
298
    window_min: int = 3,
299
    create_empty: bool = True,
300
    last: bool = False,
301
) -> SensorDataResult:
302
    time_start = time.time()
1✔
303
    logging.debug(
1✔
304
        (
305
            "Fetch data (measure: %s, host: %s, field: %s, "
306
            "start: %s, stop: %s, every: %dmin, window: %dmin, "
307
            "create_empty: %s, last: %s)"
308
        ),
309
        measure,
310
        hostname,
311
        field,
312
        start,
313
        stop,
314
        every_min,
315
        window_min,
316
        create_empty,
317
        last,
318
    )
319

320
    try:
1✔
321
        template = FLUX_QUERY_WITHOUT_AGGREGATION if window_min == 0 else FLUX_QUERY
1✔
322

323
        table_list = _fetch_data_impl(
1✔
324
            db_config,
325
            template,
326
            measure,
327
            hostname,
328
            field,
329
            start,
330
            stop,
331
            every_min,
332
            window_min,
333
            create_empty,
334
            last,
335
        )
336
        time_fetched = time.time()
1✔
337

338
        result = _process_query_results(table_list, create_empty, last, every_min, window_min)
1✔
339

340
        time_finish = time.time()
1✔
341
        if ((time_fetched - time_start) > 1) or ((time_finish - time_fetched) > 0.1):
1✔
342
            logging.warning(
×
343
                "It's taking too long to retrieve the data. (fetch: %.2f sec, modify: %.2f sec)",
344
                time_fetched - time_start,
345
                time_finish - time_fetched,
346
            )
347

348
        return result
1✔
349
    except Exception as e:
1✔
350
        logging.exception("Failed to fetch data")
1✔
351

352
        return SensorDataResult(error_message=str(e))
1✔
353

354

355
async def fetch_data_async(
1✔
356
    db_config: InfluxDBConfig,
357
    measure: str,
358
    hostname: str,
359
    field: str,
360
    start: str = "-30h",
361
    stop: str = "now()",
362
    every_min: int = 1,
363
    window_min: int = 3,
364
    create_empty: bool = True,
365
    last: bool = False,
366
) -> SensorDataResult:
367
    """非同期版のfetch_data"""
368
    time_start = time.time()
1✔
369
    logging.debug(
1✔
370
        (
371
            "Fetch data async (measure: %s, host: %s, field: %s, "
372
            "start: %s, stop: %s, every: %dmin, window: %dmin, "
373
            "create_empty: %s, last: %s)"
374
        ),
375
        measure,
376
        hostname,
377
        field,
378
        start,
379
        stop,
380
        every_min,
381
        window_min,
382
        create_empty,
383
        last,
384
    )
385

386
    try:
1✔
387
        template = FLUX_QUERY_WITHOUT_AGGREGATION if window_min == 0 else FLUX_QUERY
1✔
388

389
        table_list = await _fetch_data_impl_async(
1✔
390
            db_config,
391
            template,
392
            measure,
393
            hostname,
394
            field,
395
            start,
396
            stop,
397
            every_min,
398
            window_min,
399
            create_empty,
400
            last,
401
        )
402
        time_fetched = time.time()
1✔
403

404
        result = _process_query_results(table_list, create_empty, last, every_min, window_min)
1✔
405

406
        time_finish = time.time()
1✔
407
        if ((time_fetched - time_start) > 1) or ((time_finish - time_fetched) > 0.1):
1✔
408
            logging.warning(
×
409
                "It's taking too long to retrieve the data. (fetch: %.2f sec, modify: %.2f sec)",
410
                time_fetched - time_start,
411
                time_finish - time_fetched,
412
            )
413

414
        return result
1✔
415
    except Exception as e:
×
416
        logging.exception("Failed to fetch data")
×
417

418
        return SensorDataResult(error_message=str(e))
×
419

420

421
async def fetch_data_parallel(
1✔
422
    db_config: InfluxDBConfig, requests: list[DataRequest]
423
) -> list[SensorDataResult | BaseException]:
424
    """
425
    複数のデータ取得リクエストを並列実行
426

427
    Args:
428
    ----
429
        db_config: InfluxDBの設定(全リクエスト共通)
430
        requests: DataRequest のリスト
431

432
    Returns:
433
    -------
434
        各リクエストの結果を含むリスト
435

436
    """
437
    tasks = []
1✔
438
    for req in requests:
1✔
439
        task = fetch_data_async(
1✔
440
            db_config,
441
            req.measure,
442
            req.hostname,
443
            req.field,
444
            req.start,
445
            req.stop,
446
            req.every_min,
447
            req.window_min,
448
            req.create_empty,
449
            req.last,
450
        )
451
        tasks.append(task)
1✔
452

453
    return await asyncio.gather(*tasks, return_exceptions=True)
1✔
454

455

456
def get_equip_on_minutes(
1✔
457
    config: InfluxDBConfig,
458
    measure: str,
459
    hostname: str,
460
    field: str,
461
    threshold: float,
462
    start: str = "-30h",
463
    stop: str = "now()",
464
    every_min: int = 1,
465
    window_min: int = 5,
466
    create_empty: bool = True,
467
) -> int:
468
    logging.info(
1✔
469
        (
470
            "Get 'ON' minutes (type: %s, host: %d, field: %d{field}, "
471
            "threshold: %.2f, start: %s, stop: %s, every: %smin, "
472
            "window: %dmin, create_empty: %s)"
473
        ),
474
        measure,
475
        hostname,
476
        field,
477
        threshold,
478
        start,
479
        stop,
480
        every_min,
481
        window_min,
482
        create_empty,
483
    )
484

485
    try:
1✔
486
        table_list = _fetch_data_impl(
1✔
487
            config,
488
            FLUX_QUERY,
489
            measure,
490
            hostname,
491
            field,
492
            start,
493
            stop,
494
            every_min,
495
            window_min,
496
            create_empty,
497
        )
498

499
        if len(table_list) == 0:
1✔
500
            return 0
1✔
501

502
        count = 0
1✔
503

504
        every_min = int(every_min)
1✔
505
        window_min = int(window_min)
1✔
506
        record_num = len(table_list[0].records)
1✔
507
        for i, record in enumerate(table_list[0].records):
1✔
508
            if create_empty and (window_min > every_min) and (i > record_num - 1 - (window_min - every_min)):
1✔
509
                # NOTE: timedMovingAverage を使うと、末尾に余分なデータが入るので取り除く
510
                continue
×
511

512
            # NOTE: aggregateWindow(createEmpty: true) と fill(usePrevious: true) の組み合わせ
513
            # だとタイミングによって、先頭に None が入る
514
            if record.get_value() is None:
1✔
515
                continue
×
516
            if record.get_value() >= threshold:
1✔
517
                count += 1
1✔
518

519
        return count * int(every_min)
1✔
520
    except Exception:
1✔
521
        logging.exception("Failed to fetch data")
1✔
522
        return 0
1✔
523

524

525
def get_equip_mode_period(
1✔
526
    config: InfluxDBConfig,
527
    measure: str,
528
    hostname: str,
529
    field: str,
530
    threshold_list: list[float],
531
    start: str = "-30h",
532
    stop: str = "now()",
533
    every_min: int = 10,
534
    window_min: int = 10,
535
    create_empty: bool = True,
536
) -> list[list[Any]]:
537
    logging.info(
×
538
        "Get equipment mode period (type: %s, host: %s, field: %s, "
539
        "threshold: %s, start: %s, stop: %s, every: %dmin, "
540
        "window: %dmin, create_empty: %s)",
541
        measure,
542
        hostname,
543
        field,
544
        f"[{','.join(f'{v:.1f}' for v in threshold_list)}]",
545
        start,
546
        stop,
547
        every_min,
548
        window_min,
549
        create_empty,
550
    )
551

552
    try:
×
553
        table_list = _fetch_data_impl(
×
554
            config,
555
            FLUX_QUERY,
556
            measure,
557
            hostname,
558
            field,
559
            start,
560
            stop,
561
            every_min,
562
            window_min,
563
            create_empty,
564
        )
565

566
        if len(table_list) == 0:
×
567
            return []
×
568

569
        # NOTE: 常時冷却と間欠冷却の期間を求める
570
        on_range = []
×
571
        state = -1
×
572
        start_time = None
×
573
        prev_time = None
×
574
        localtime_offset = datetime.timedelta(hours=9)
×
575

576
        for record in table_list[0].records:
×
577
            # NOTE: aggregateWindow(createEmpty: true) と fill(usePrevious: true) の組み合わせ
578
            # だとタイミングによって、先頭に None が入る
579
            if record.get_value() is None:
×
580
                logging.debug("DELETE %s", record.get_time() + localtime_offset)
×
581
                continue
×
582

583
            is_idle = True
×
584
            for i in range(len(threshold_list)):
×
585
                if record.get_value() > threshold_list[i]:
×
586
                    if state != i:
×
587
                        if state != -1:
×
588
                            assert start_time is not None  # noqa: S101
×
589
                            assert prev_time is not None  # noqa: S101
×
590
                            on_range.append(
×
591
                                [
592
                                    start_time + localtime_offset,
593
                                    prev_time + localtime_offset,
594
                                    state,
595
                                ]
596
                            )
597
                        state = i
×
598
                        start_time = record.get_time()
×
599
                    is_idle = False
×
600
                    break
×
601
            if is_idle and state != -1:
×
602
                assert start_time is not None  # noqa: S101
×
603
                assert prev_time is not None  # noqa: S101
×
604
                on_range.append(
×
605
                    [
606
                        start_time + localtime_offset,
607
                        prev_time + localtime_offset,
608
                        state,
609
                    ]
610
                )
611
                state = -1
×
612
                start_time = record.get_time()
×
613

614
            prev_time = record.get_time()
×
615

616
        if state != -1:
×
617
            assert start_time is not None  # noqa: S101
×
618
            on_range.append(
×
619
                [
620
                    start_time + localtime_offset,
621
                    table_list[0].records[-1].get_time() + localtime_offset,
622
                    state,
623
                ]
624
            )
625
        return on_range
×
626
    except Exception:
×
627
        logging.exception("Failed to fetch data")
×
628
        return []
×
629

630

631
def get_sum(
1✔
632
    config: InfluxDBConfig,
633
    measure: str,
634
    hostname: str,
635
    field: str,
636
    start: str = "-3m",
637
    stop: str = "now()",
638
    every_min: int = 1,
639
    window_min: int = 3,
640
) -> float:
641
    try:
1✔
642
        table_list = _fetch_data_impl(
1✔
643
            config, FLUX_SUM_QUERY, measure, hostname, field, start, stop, every_min, window_min, True
644
        )
645

646
        value_list = table_list.to_values(columns=["count", "sum"])
1✔
647

648
        if len(value_list) == 0:
1✔
649
            return 0
1✔
650
        else:
651
            sum_value = value_list[0][1]
1✔
652
            if isinstance(sum_value, int | float):
1✔
653
                return float(sum_value)
1✔
654
            logging.warning("Unexpected sum value type: %s (value=%s)", type(sum_value).__name__, sum_value)
×
655
            return 0
×
656
    except Exception:
1✔
657
        logging.exception("Failed to fetch data")
1✔
658
        return 0
1✔
659

660

661
def get_day_sum(
1✔
662
    config: InfluxDBConfig,
663
    measure: str,
664
    hostname: str,
665
    field: str,
666
    days: int,
667
    day_before: int = 0,
668
    day_offset: int = 0,
669
    every_min: int = 1,
670
    window_min: int = 5,
671
) -> float:
672
    now = my_lib.time.now()
1✔
673

674
    if day_before == 0:
1✔
675
        start = f"-{day_offset + days - 1}d{now.hour}h{now.minute}m"
1✔
676
        stop = f"-{day_offset}d"
1✔
677
    else:
678
        start = f"-{day_before + day_offset + days - 1}d{now.hour}h{now.minute}m"
×
679
        stop = f"-{day_before + day_offset - 1}d{now.hour}h{now.minute}m"
×
680

681
    return get_sum(config, measure, hostname, field, start, stop, every_min, window_min)
1✔
682

683

684
def get_hour_sum(
1✔
685
    config: InfluxDBConfig,
686
    measure: str,
687
    hostname: str,
688
    field: str,
689
    hours: int,
690
    day_offset: int = 0,
691
    every_min: int = 1,
692
    window_min: int = 1,
693
) -> float:
694
    start = f"-{day_offset * 24 + hours}h"
1✔
695
    stop = f"-{day_offset * 24}h"
1✔
696

697
    return get_sum(config, measure, hostname, field, start, stop, every_min, window_min)
1✔
698

699

700
def get_minute_sum(
1✔
701
    config: InfluxDBConfig,
702
    measure: str,
703
    hostname: str,
704
    field: str,
705
    minutes: int,
706
    day_offset: int = 0,
707
    every_min: int = 1,
708
    window_min: int = 1,
709
) -> float:
710
    start = f"-{day_offset * 24 * 60 + minutes}m"
1✔
711
    stop = f"-{day_offset * 24 * 60}m"
1✔
712

713
    return get_sum(config, measure, hostname, field, start, stop, every_min, window_min)
1✔
714

715

716
def get_last_event(
1✔
717
    config: InfluxDBConfig, measure: str, hostname: str, field: str, start: str = "-7d"
718
) -> datetime.datetime | None:
719
    try:
1✔
720
        table_list = _fetch_data_impl(
1✔
721
            config, FLUX_EVENT_QUERY, measure, hostname, field, start, "now()", 0, 0, False
722
        )
723

724
        value_list = table_list.to_values(columns=["_time"])
1✔
725

726
        if len(value_list) == 0:
1✔
727
            return None
1✔
728
        else:
729
            time_value = value_list[0][0]
1✔
730
            if isinstance(time_value, datetime.datetime):
1✔
731
                return time_value
1✔
732
            logging.warning(
×
733
                "Unexpected time value type: %s (value=%s)", type(time_value).__name__, time_value
734
            )
735
            return None
×
736
    except Exception:
1✔
737
        logging.exception("Failed to fetch data")
1✔
738
        return None
1✔
739

740

741
def dump_data(data: SensorDataResult) -> None:
1✔
742
    for i in range(len(data.time)):
1✔
743
        logging.info("%s: %s", data.time[i], data.value[i])
1✔
744

745

746
if __name__ == "__main__":
747
    # TEST Code
748
    import docopt
749

750
    import my_lib.config
751
    import my_lib.logger
752
    import my_lib.pretty
753

754
    def get_config(config, dotted_key):
755
        keys = dotted_key.split(".")
756
        value = config
757

758
        for key in keys:
759
            value = value[key]
760

761
        return value
762

763
    assert __doc__ is not None  # noqa: S101
764
    args = docopt.docopt(__doc__)
765

766
    config_file = args["-c"]
767
    mode = args["-m"]
768
    every = args["-e"]
769
    window = args["-w"]
770
    infxlux_db_spec = args["-i"]
771
    sensor_spec = args["-s"]
772
    field_name = args["-f"]
773
    period = int(args["-p"])
774
    debug_mode = args["-D"]
775

776
    my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
777

778
    config = my_lib.config.load(config_file)
779

780
    db_config = InfluxDBConfig.parse(get_config(config, infxlux_db_spec))
781
    sensor_config = get_config(config, sensor_spec)
782

783
    logging.info("DB config: %s", my_lib.pretty.format(db_config))
784
    logging.info("Sensor config: %s", my_lib.pretty.format(sensor_config))
785

786
    result: SensorDataResult | float
787
    if mode == "data":
788
        result = fetch_data(
789
            db_config,
790
            sensor_config["measure"],
791
            sensor_config["hostname"],
792
            field_name,
793
            start="-10m",
794
            stop="now()",
795
            every_min=1,
796
            window_min=3,
797
            create_empty=True,
798
            last=False,
799
        )
800
    elif mode == "day_sum":
801
        result = get_day_sum(
802
            db_config, sensor_config["measure"], sensor_config["hostname"], field_name, period
803
        )
804
    elif mode == "hour_sum":
805
        result = get_hour_sum(
806
            db_config, sensor_config["measure"], sensor_config["hostname"], field_name, period
807
        )
808
    elif mode == "minute_sum":
809
        result = get_minute_sum(
810
            db_config, sensor_config["measure"], sensor_config["hostname"], field_name, period
811
        )
812
    else:
813
        logging.error("Unknown mode: %s", mode)
814
        result = 0.0
815

816
    logging.info(my_lib.pretty.format(result))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc