• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kimata / my-py-lib / 24995161894

27 Apr 2026 12:31PM UTC coverage: 59.648%. Remained the same
24995161894

push

github

kimata
feat: sensor_data.fetch_data に timeout_sec パラメータを追加

InfluxDBClient のデフォルトタイムアウト 10 秒は、ホスト側の
一時的な遅延でアプリのスケジューラループが長時間ブロックされる
原因になるため、呼び出し側で短縮できるように引数化した。
デフォルトは従来通り 10.0 秒で後方互換を維持する。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

0 of 1 new or added line in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

3867 of 6483 relevant lines covered (59.65%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.08
/src/my_lib/sensor_data.py
1
#!/usr/bin/env python3
2
"""
3
InfluxDB からデータを取得します。
4

5
Usage:
6
  sensor_data.py [-c CONFIG] [-m MODE] [-i DB_SPEC] [-s SENSOR_SPEC] [-f FIELD] [-e EVERY] [-w WINDOW]
7
                 [-p HOURS] [-D]
8

9
Options:
10
  -c CONFIG         : CONFIG を設定ファイルとして読み込んで実行します。
11
                      [default: tests/fixtures/config.example.yaml]
12
  -m MODE           : データ取得モード。(data, day_sum, hour_sum, minute_sum のいずれか) [default: data]
13
  -i DB_SPEC        : 設定ファイルの中で InfluxDB の設定が書かれているパス。[default: sensor.influxdb]
14
  -s SENSOR_SPEC    : 設定ファイルの中で取得対象のデータの設定が書かれているパス。[default: sensor.lux]
15
  -f FIELD          : 取得するフィールド。[default: lux]
16
  -e EVERY          : 何分ごとのデータを取得するか。[default: 1]
17
  -w WINDOWE        : 算出に使うウィンドウ。[default: 5]
18
  -p PERIOD         : 積算(sum)モードの場合に、過去どのくらいの分を取得するか。[default: 1]
19
  -D                : デバッグモードで動作します。
20
"""
21

22
from __future__ import annotations
1✔
23

24
import asyncio
1✔
25
import datetime
1✔
26
import logging
1✔
27
import os
1✔
28
import time
1✔
29
from dataclasses import dataclass, field
1✔
30
from typing import Any, Self
1✔
31

32

33
@dataclass(frozen=True)
1✔
34
class InfluxDBConfig:
1✔
35
    """InfluxDB 接続設定"""
36

37
    url: str
1✔
38
    token: str
1✔
39
    org: str
1✔
40
    bucket: str
1✔
41

42
    @classmethod
1✔
43
    def parse(cls, data: dict[str, Any]) -> Self:
1✔
44
        return cls(
×
45
            url=data["url"],
46
            token=data["token"],
47
            org=data["org"],
48
            bucket=data["bucket"],
49
        )
50

51

52
import influxdb_client  # noqa: E402
1✔
53
from influxdb_client.client.flux_table import TableList  # noqa: E402
1✔
54

55
import my_lib.time  # noqa: E402
1✔
56

57

58
@dataclass(frozen=True)
1✔
59
class SensorDataResult:
1✔
60
    """センサーデータ取得結果
61

62
    Attributes:
63
        value: センサー値のリスト
64
        time: タイムスタンプのリスト
65
        valid: データが有効かどうか
66
        raw_record_count: 取得した生レコード数(処理前)
67
        null_count: None だったレコード数
68
        error_message: エラー発生時のメッセージ
69
    """
70

71
    value: list[float] = field(default_factory=list)
1✔
72
    time: list[datetime.datetime] = field(default_factory=list)
1✔
73
    valid: bool = False
1✔
74
    raw_record_count: int = 0
1✔
75
    null_count: int = 0
1✔
76
    error_message: str | None = None
1✔
77

78
    def get_diagnostic_message(self) -> str:
1✔
79
        """診断メッセージを生成"""
80
        if self.error_message:
×
81
            return f"接続エラー: {self.error_message}"
×
82
        if self.raw_record_count == 0:
×
83
            return "データなし: クエリ結果が空でした"
×
84
        if self.null_count == self.raw_record_count:
×
85
            return f"全データがNone: {self.raw_record_count}件すべてがNoneでした"
×
86
        if self.null_count > 0:
×
87
            return (
×
88
                f"一部データがNone: {self.raw_record_count}件中{self.null_count}件がNone、"
89
                f"有効データ{len(self.value)}件"
90
            )
91
        return f"データ取得成功: {len(self.value)}件"
×
92

93

94
@dataclass(frozen=True)
1✔
95
class DataRequest:
1✔
96
    """センサーデータ取得リクエスト"""
97

98
    measure: str
1✔
99
    hostname: str
1✔
100
    field: str
1✔
101
    start: str = "-30h"
1✔
102
    stop: str = "now()"
1✔
103
    every_min: int = 1
1✔
104
    window_min: int = 3
1✔
105
    create_empty: bool = True
1✔
106
    last: bool = False
1✔
107

108

109
# NOTE: データが欠損している期間も含めてデータを敷き詰めるため、
110
# timedMovingAverage を使う。timedMovingAverage の計算の結果、データが後ろに
111
# ずれるので、あらかじめ offset を使って前にずらしておく。
112
FLUX_QUERY = """
1✔
113
from(bucket: "{bucket}")
114
|> range(start: {start}, stop: {stop})
115
    |> filter(fn:(r) => r._measurement == "{measure}")
116
    |> filter(fn: (r) => r.hostname == "{hostname}")
117
    |> filter(fn: (r) => r["_field"] == "{field}")
118
    |> aggregateWindow(every: {window}m, offset:-{window}m, fn: mean, createEmpty: {create_empty})
119
    |> fill(usePrevious: true)
120
    |> timedMovingAverage(every: {every}m, period: {window}m)
121
"""
122

123
FLUX_QUERY_WITHOUT_AGGREGATION = """
1✔
124
from(bucket: "{bucket}")
125
|> range(start: {start}, stop: {stop})
126
    |> filter(fn:(r) => r._measurement == "{measure}")
127
    |> filter(fn: (r) => r.hostname == "{hostname}")
128
    |> filter(fn: (r) => r["_field"] == "{field}")
129
    |> fill(usePrevious: true)
130
"""
131

132
FLUX_SUM_QUERY = """
1✔
133
from(bucket: "{bucket}")
134
    |> range(start: {start}, stop: {stop})
135
    |> filter(fn:(r) => r._measurement == "{measure}")
136
    |> filter(fn: (r) => r.hostname == "{hostname}")
137
    |> filter(fn: (r) => r["_field"] == "{field}")
138
    |> aggregateWindow(every: {every}m, offset:-{every}m, fn: mean, createEmpty: {create_empty})
139
    |> filter(fn: (r) => exists r._value)
140
    |> fill(usePrevious: true)
141
    |> reduce(
142
        fn: (r, accumulator) => ({{sum: r._value + accumulator.sum, count: accumulator.count + 1}}),
143
        identity: {{sum: 0.0, count: 0}},
144
    )
145
"""
146

147
FLUX_EVENT_QUERY = """
1✔
148
from(bucket: "{bucket}")
149
    |> range(start: {start})
150
    |> filter(fn: (r) => r._measurement == "{measure}")
151
    |> filter(fn: (r) => r.hostname == "{hostname}")
152
    |> filter(fn: (r) => r["_field"] == "{field}")
153
    |> map(fn: (r) => ({{ r with _value: if r._value then 1 else 0 }}))
154
    |> difference()
155
    |> filter(fn: (r) => r._value == 1)
156
    |> sort(columns: ["_time"], desc: true)
157
    |> limit(n: 1)
158
"""
159

160

161
def _process_query_results(
1✔
162
    table_list: list[Any], create_empty: bool, last: bool, every_min: int, window_min: int
163
) -> SensorDataResult:
164
    """共通のクエリ結果処理ロジック"""
165
    data_list = []
1✔
166
    time_list = []
1✔
167
    localtime_offset = datetime.timedelta(hours=9)
1✔
168

169
    # 診断情報
170
    raw_record_count = 0
1✔
171
    null_count = 0
1✔
172

173
    if len(table_list) != 0:
1✔
174
        raw_record_count = len(table_list[0].records)
1✔
175
        for record in table_list[0].records:
1✔
176
            # NOTE: aggregateWindow(createEmpty: true) と fill(usePrevious: true) の組み合わせ
177
            # だとタイミングによって、先頭に None が入る
178
            if record.get_value() is None:
1✔
179
                logging.debug("DELETE %s", record.get_time() + localtime_offset)
1✔
180
                null_count += 1
1✔
181
                continue
1✔
182

183
            data_list.append(record.get_value())
1✔
184
            time_list.append(record.get_time() + localtime_offset)
1✔
185

186
    if create_empty and not last:
1✔
187
        # NOTE: aggregateWindow(createEmpty: true) と timedMovingAverage を使うと、
188
        # 末尾に余分なデータが入るので取り除く
189
        every_min = int(every_min)
1✔
190
        window_min = int(window_min)
1✔
191
        if window_min > every_min:
1✔
192
            trim_count = window_min - every_min
1✔
193
            # データが十分にある場合のみ切り詰め
194
            if len(data_list) > trim_count:
1✔
195
                data_list = data_list[:-trim_count]
1✔
196
                time_list = time_list[:-trim_count]
1✔
197
            else:
198
                logging.warning(
1✔
199
                    "Insufficient data to trim: data_count=%d, trim_count=%d",
200
                    len(data_list),
201
                    trim_count,
202
                )
203

204
    logging.debug("data count = %s", len(time_list))
1✔
205
    return SensorDataResult(
1✔
206
        value=data_list,
207
        time=time_list,
208
        valid=len(time_list) != 0,
209
        raw_record_count=raw_record_count,
210
        null_count=null_count,
211
    )
212

213

214
def _fetch_data_impl(
1✔
215
    db_config: InfluxDBConfig,
216
    template: str,
217
    measure: str,
218
    hostname: str,
219
    field: str,
220
    start: str,
221
    stop: str,
222
    every: int,
223
    window: int,
224
    create_empty: bool,
225
    last: bool = False,
226
    timeout_sec: float = 10.0,
227
) -> TableList:
228
    client = None
×
229
    try:
×
230
        token = os.environ.get("INFLUXDB_TOKEN", db_config.token)
×
231

232
        query = template.format(
×
233
            bucket=db_config.bucket,
234
            measure=measure,
235
            hostname=hostname,
236
            field=field,
237
            start=start,
238
            stop=stop,
239
            every=every,
240
            window=window,
241
            create_empty=str(create_empty).lower(),
242
        )
243
        if last:
×
244
            query += " |> last()"
×
245

246
        logging.debug("Flux query = %s", query)
×
NEW
247
        client = influxdb_client.InfluxDBClient(  # pyright: ignore[reportPrivateImportUsage]
×
248
            url=db_config.url, token=token, org=db_config.org, timeout=int(timeout_sec * 1000)
249
        )
UNCOV
250
        query_api = client.query_api()
×
251

252
        return query_api.query(query=query)
×
253
    except Exception:
×
254
        logging.exception("Failed to fetch data")
×
255
        raise
×
256
    finally:
257
        if client is not None:
×
258
            client.close()
×
259

260

261
async def _fetch_data_impl_async(
1✔
262
    db_config: InfluxDBConfig,
263
    template: str,
264
    measure: str,
265
    hostname: str,
266
    field: str,
267
    start: str,
268
    stop: str,
269
    every: int,
270
    window: int,
271
    create_empty: bool,
272
    last: bool = False,
273
    timeout_sec: float = 10.0,
274
) -> TableList:
275
    """非同期版のデータ取得実装"""
276
    loop = asyncio.get_event_loop()
×
277
    return await loop.run_in_executor(
×
278
        None,
279
        _fetch_data_impl,
280
        db_config,
281
        template,
282
        measure,
283
        hostname,
284
        field,
285
        start,
286
        stop,
287
        every,
288
        window,
289
        create_empty,
290
        last,
291
        timeout_sec,
292
    )
293

294

295
def fetch_data(
1✔
296
    db_config: InfluxDBConfig,
297
    measure: str,
298
    hostname: str,
299
    field: str,
300
    start: str = "-30h",
301
    stop: str = "now()",
302
    every_min: int = 1,
303
    window_min: int = 3,
304
    create_empty: bool = True,
305
    last: bool = False,
306
    timeout_sec: float = 10.0,
307
) -> SensorDataResult:
308
    time_start = time.time()
1✔
309
    logging.debug(
1✔
310
        (
311
            "Fetch data (measure: %s, host: %s, field: %s, "
312
            "start: %s, stop: %s, every: %dmin, window: %dmin, "
313
            "create_empty: %s, last: %s)"
314
        ),
315
        measure,
316
        hostname,
317
        field,
318
        start,
319
        stop,
320
        every_min,
321
        window_min,
322
        create_empty,
323
        last,
324
    )
325

326
    try:
1✔
327
        template = FLUX_QUERY_WITHOUT_AGGREGATION if window_min == 0 else FLUX_QUERY
1✔
328

329
        table_list = _fetch_data_impl(
1✔
330
            db_config,
331
            template,
332
            measure,
333
            hostname,
334
            field,
335
            start,
336
            stop,
337
            every_min,
338
            window_min,
339
            create_empty,
340
            last,
341
            timeout_sec,
342
        )
343
        time_fetched = time.time()
1✔
344

345
        result = _process_query_results(table_list, create_empty, last, every_min, window_min)
1✔
346

347
        time_finish = time.time()
1✔
348
        if ((time_fetched - time_start) > 1) or ((time_finish - time_fetched) > 0.1):
1✔
349
            logging.warning(
×
350
                "It's taking too long to retrieve the data. (fetch: %.2f sec, modify: %.2f sec)",
351
                time_fetched - time_start,
352
                time_finish - time_fetched,
353
            )
354

355
        return result
1✔
356
    except Exception as e:
1✔
357
        logging.exception("Failed to fetch data")
1✔
358

359
        return SensorDataResult(error_message=str(e))
1✔
360

361

362
async def fetch_data_async(
1✔
363
    db_config: InfluxDBConfig,
364
    measure: str,
365
    hostname: str,
366
    field: str,
367
    start: str = "-30h",
368
    stop: str = "now()",
369
    every_min: int = 1,
370
    window_min: int = 3,
371
    create_empty: bool = True,
372
    last: bool = False,
373
    timeout_sec: float = 10.0,
374
) -> SensorDataResult:
375
    """非同期版のfetch_data"""
376
    time_start = time.time()
1✔
377
    logging.debug(
1✔
378
        (
379
            "Fetch data async (measure: %s, host: %s, field: %s, "
380
            "start: %s, stop: %s, every: %dmin, window: %dmin, "
381
            "create_empty: %s, last: %s)"
382
        ),
383
        measure,
384
        hostname,
385
        field,
386
        start,
387
        stop,
388
        every_min,
389
        window_min,
390
        create_empty,
391
        last,
392
    )
393

394
    try:
1✔
395
        template = FLUX_QUERY_WITHOUT_AGGREGATION if window_min == 0 else FLUX_QUERY
1✔
396

397
        table_list = await _fetch_data_impl_async(
1✔
398
            db_config,
399
            template,
400
            measure,
401
            hostname,
402
            field,
403
            start,
404
            stop,
405
            every_min,
406
            window_min,
407
            create_empty,
408
            last,
409
            timeout_sec,
410
        )
411
        time_fetched = time.time()
1✔
412

413
        result = _process_query_results(table_list, create_empty, last, every_min, window_min)
1✔
414

415
        time_finish = time.time()
1✔
416
        if ((time_fetched - time_start) > 1) or ((time_finish - time_fetched) > 0.1):
1✔
417
            logging.warning(
×
418
                "It's taking too long to retrieve the data. (fetch: %.2f sec, modify: %.2f sec)",
419
                time_fetched - time_start,
420
                time_finish - time_fetched,
421
            )
422

423
        return result
1✔
424
    except Exception as e:
×
425
        logging.exception("Failed to fetch data")
×
426

427
        return SensorDataResult(error_message=str(e))
×
428

429

430
async def fetch_data_parallel(
1✔
431
    db_config: InfluxDBConfig, requests: list[DataRequest]
432
) -> list[SensorDataResult | BaseException]:
433
    """
434
    複数のデータ取得リクエストを並列実行
435

436
    Args:
437
    ----
438
        db_config: InfluxDBの設定(全リクエスト共通)
439
        requests: DataRequest のリスト
440

441
    Returns:
442
    -------
443
        各リクエストの結果を含むリスト
444

445
    """
446
    tasks = []
1✔
447
    for req in requests:
1✔
448
        task = fetch_data_async(
1✔
449
            db_config,
450
            req.measure,
451
            req.hostname,
452
            req.field,
453
            req.start,
454
            req.stop,
455
            req.every_min,
456
            req.window_min,
457
            req.create_empty,
458
            req.last,
459
        )
460
        tasks.append(task)
1✔
461

462
    return await asyncio.gather(*tasks, return_exceptions=True)
1✔
463

464

465
def get_equip_on_minutes(
1✔
466
    config: InfluxDBConfig,
467
    measure: str,
468
    hostname: str,
469
    field: str,
470
    threshold: float,
471
    start: str = "-30h",
472
    stop: str = "now()",
473
    every_min: int = 1,
474
    window_min: int = 5,
475
    create_empty: bool = True,
476
) -> int:
477
    logging.info(
1✔
478
        (
479
            "Get 'ON' minutes (type: %s, host: %d, field: %d{field}, "
480
            "threshold: %.2f, start: %s, stop: %s, every: %smin, "
481
            "window: %dmin, create_empty: %s)"
482
        ),
483
        measure,
484
        hostname,
485
        field,
486
        threshold,
487
        start,
488
        stop,
489
        every_min,
490
        window_min,
491
        create_empty,
492
    )
493

494
    try:
1✔
495
        table_list = _fetch_data_impl(
1✔
496
            config,
497
            FLUX_QUERY,
498
            measure,
499
            hostname,
500
            field,
501
            start,
502
            stop,
503
            every_min,
504
            window_min,
505
            create_empty,
506
        )
507

508
        if len(table_list) == 0:
1✔
509
            return 0
1✔
510

511
        count = 0
1✔
512

513
        every_min = int(every_min)
1✔
514
        window_min = int(window_min)
1✔
515
        record_num = len(table_list[0].records)
1✔
516
        for i, record in enumerate(table_list[0].records):
1✔
517
            if create_empty and (window_min > every_min) and (i > record_num - 1 - (window_min - every_min)):
1✔
518
                # NOTE: timedMovingAverage を使うと、末尾に余分なデータが入るので取り除く
519
                continue
×
520

521
            # NOTE: aggregateWindow(createEmpty: true) と fill(usePrevious: true) の組み合わせ
522
            # だとタイミングによって、先頭に None が入る
523
            if record.get_value() is None:
1✔
524
                continue
×
525
            if record.get_value() >= threshold:
1✔
526
                count += 1
1✔
527

528
        return count * int(every_min)
1✔
529
    except Exception:
1✔
530
        logging.exception("Failed to fetch data")
1✔
531
        return 0
1✔
532

533

534
def get_equip_mode_period(
1✔
535
    config: InfluxDBConfig,
536
    measure: str,
537
    hostname: str,
538
    field: str,
539
    threshold_list: list[float],
540
    start: str = "-30h",
541
    stop: str = "now()",
542
    every_min: int = 10,
543
    window_min: int = 10,
544
    create_empty: bool = True,
545
) -> list[list[Any]]:
546
    logging.info(
×
547
        "Get equipment mode period (type: %s, host: %s, field: %s, "
548
        "threshold: %s, start: %s, stop: %s, every: %dmin, "
549
        "window: %dmin, create_empty: %s)",
550
        measure,
551
        hostname,
552
        field,
553
        f"[{','.join(f'{v:.1f}' for v in threshold_list)}]",
554
        start,
555
        stop,
556
        every_min,
557
        window_min,
558
        create_empty,
559
    )
560

561
    try:
×
562
        table_list = _fetch_data_impl(
×
563
            config,
564
            FLUX_QUERY,
565
            measure,
566
            hostname,
567
            field,
568
            start,
569
            stop,
570
            every_min,
571
            window_min,
572
            create_empty,
573
        )
574

575
        if len(table_list) == 0:
×
576
            return []
×
577

578
        # NOTE: 常時冷却と間欠冷却の期間を求める
579
        on_range = []
×
580
        state = -1
×
581
        start_time = None
×
582
        prev_time = None
×
583
        localtime_offset = datetime.timedelta(hours=9)
×
584

585
        for record in table_list[0].records:
×
586
            # NOTE: aggregateWindow(createEmpty: true) と fill(usePrevious: true) の組み合わせ
587
            # だとタイミングによって、先頭に None が入る
588
            if record.get_value() is None:
×
589
                logging.debug("DELETE %s", record.get_time() + localtime_offset)
×
590
                continue
×
591

592
            is_idle = True
×
593
            for i in range(len(threshold_list)):
×
594
                if record.get_value() > threshold_list[i]:
×
595
                    if state != i:
×
596
                        if state != -1:
×
597
                            assert start_time is not None  # noqa: S101
×
598
                            assert prev_time is not None  # noqa: S101
×
599
                            on_range.append(
×
600
                                [
601
                                    start_time + localtime_offset,
602
                                    prev_time + localtime_offset,
603
                                    state,
604
                                ]
605
                            )
606
                        state = i
×
607
                        start_time = record.get_time()
×
608
                    is_idle = False
×
609
                    break
×
610
            if is_idle and state != -1:
×
611
                assert start_time is not None  # noqa: S101
×
612
                assert prev_time is not None  # noqa: S101
×
613
                on_range.append(
×
614
                    [
615
                        start_time + localtime_offset,
616
                        prev_time + localtime_offset,
617
                        state,
618
                    ]
619
                )
620
                state = -1
×
621
                start_time = record.get_time()
×
622

623
            prev_time = record.get_time()
×
624

625
        if state != -1:
×
626
            assert start_time is not None  # noqa: S101
×
627
            on_range.append(
×
628
                [
629
                    start_time + localtime_offset,
630
                    table_list[0].records[-1].get_time() + localtime_offset,
631
                    state,
632
                ]
633
            )
634
        return on_range
×
635
    except Exception:
×
636
        logging.exception("Failed to fetch data")
×
637
        return []
×
638

639

640
def get_sum(
1✔
641
    config: InfluxDBConfig,
642
    measure: str,
643
    hostname: str,
644
    field: str,
645
    start: str = "-3m",
646
    stop: str = "now()",
647
    every_min: int = 1,
648
    window_min: int = 3,
649
) -> float:
650
    try:
1✔
651
        table_list = _fetch_data_impl(
1✔
652
            config, FLUX_SUM_QUERY, measure, hostname, field, start, stop, every_min, window_min, True
653
        )
654

655
        value_list = table_list.to_values(columns=["count", "sum"])
1✔
656

657
        if len(value_list) == 0:
1✔
658
            return 0
1✔
659
        else:
660
            sum_value = value_list[0][1]
1✔
661
            if isinstance(sum_value, int | float):
1✔
662
                return float(sum_value)
1✔
663
            logging.warning("Unexpected sum value type: %s (value=%s)", type(sum_value).__name__, sum_value)
×
664
            return 0
×
665
    except Exception:
1✔
666
        logging.exception("Failed to fetch data")
1✔
667
        return 0
1✔
668

669

670
def get_day_sum(
1✔
671
    config: InfluxDBConfig,
672
    measure: str,
673
    hostname: str,
674
    field: str,
675
    days: int,
676
    day_before: int = 0,
677
    day_offset: int = 0,
678
    every_min: int = 1,
679
    window_min: int = 5,
680
) -> float:
681
    now = my_lib.time.now()
1✔
682

683
    if day_before == 0:
1✔
684
        start = f"-{day_offset + days - 1}d{now.hour}h{now.minute}m"
1✔
685
        stop = f"-{day_offset}d"
1✔
686
    else:
687
        start = f"-{day_before + day_offset + days - 1}d{now.hour}h{now.minute}m"
×
688
        stop = f"-{day_before + day_offset - 1}d{now.hour}h{now.minute}m"
×
689

690
    return get_sum(config, measure, hostname, field, start, stop, every_min, window_min)
1✔
691

692

693
def get_hour_sum(
1✔
694
    config: InfluxDBConfig,
695
    measure: str,
696
    hostname: str,
697
    field: str,
698
    hours: int,
699
    day_offset: int = 0,
700
    every_min: int = 1,
701
    window_min: int = 1,
702
) -> float:
703
    start = f"-{day_offset * 24 + hours}h"
1✔
704
    stop = f"-{day_offset * 24}h"
1✔
705

706
    return get_sum(config, measure, hostname, field, start, stop, every_min, window_min)
1✔
707

708

709
def get_minute_sum(
1✔
710
    config: InfluxDBConfig,
711
    measure: str,
712
    hostname: str,
713
    field: str,
714
    minutes: int,
715
    day_offset: int = 0,
716
    every_min: int = 1,
717
    window_min: int = 1,
718
) -> float:
719
    start = f"-{day_offset * 24 * 60 + minutes}m"
1✔
720
    stop = f"-{day_offset * 24 * 60}m"
1✔
721

722
    return get_sum(config, measure, hostname, field, start, stop, every_min, window_min)
1✔
723

724

725
def get_last_event(
1✔
726
    config: InfluxDBConfig, measure: str, hostname: str, field: str, start: str = "-7d"
727
) -> datetime.datetime | None:
728
    try:
1✔
729
        table_list = _fetch_data_impl(
1✔
730
            config, FLUX_EVENT_QUERY, measure, hostname, field, start, "now()", 0, 0, False
731
        )
732

733
        value_list = table_list.to_values(columns=["_time"])
1✔
734

735
        if len(value_list) == 0:
1✔
736
            return None
1✔
737
        else:
738
            time_value = value_list[0][0]
1✔
739
            if isinstance(time_value, datetime.datetime):
1✔
740
                return time_value
1✔
741
            logging.warning(
×
742
                "Unexpected time value type: %s (value=%s)", type(time_value).__name__, time_value
743
            )
744
            return None
×
745
    except Exception:
1✔
746
        logging.exception("Failed to fetch data")
1✔
747
        return None
1✔
748

749

750
def dump_data(data: SensorDataResult) -> None:
1✔
751
    for i in range(len(data.time)):
1✔
752
        logging.info("%s: %s", data.time[i], data.value[i])
1✔
753

754

755
if __name__ == "__main__":
756
    # TEST Code
757
    import docopt
758

759
    import my_lib.config
760
    import my_lib.logger
761
    import my_lib.pretty
762

763
    def get_config(config, dotted_key):
764
        keys = dotted_key.split(".")
765
        value = config
766

767
        for key in keys:
768
            value = value[key]
769

770
        return value
771

772
    assert __doc__ is not None  # noqa: S101
773
    args = docopt.docopt(__doc__)
774

775
    config_file = args["-c"]
776
    mode = args["-m"]
777
    every = args["-e"]
778
    window = args["-w"]
779
    infxlux_db_spec = args["-i"]
780
    sensor_spec = args["-s"]
781
    field_name = args["-f"]
782
    period = int(args["-p"])
783
    debug_mode = args["-D"]
784

785
    my_lib.logger.init("test", level=logging.DEBUG if debug_mode else logging.INFO)
786

787
    config = my_lib.config.load(config_file)
788

789
    db_config = InfluxDBConfig.parse(get_config(config, infxlux_db_spec))
790
    sensor_config = get_config(config, sensor_spec)
791

792
    logging.info("DB config: %s", my_lib.pretty.format(db_config))
793
    logging.info("Sensor config: %s", my_lib.pretty.format(sensor_config))
794

795
    result: SensorDataResult | float
796
    if mode == "data":
797
        result = fetch_data(
798
            db_config,
799
            sensor_config["measure"],
800
            sensor_config["hostname"],
801
            field_name,
802
            start="-10m",
803
            stop="now()",
804
            every_min=1,
805
            window_min=3,
806
            create_empty=True,
807
            last=False,
808
        )
809
    elif mode == "day_sum":
810
        result = get_day_sum(
811
            db_config, sensor_config["measure"], sensor_config["hostname"], field_name, period
812
        )
813
    elif mode == "hour_sum":
814
        result = get_hour_sum(
815
            db_config, sensor_config["measure"], sensor_config["hostname"], field_name, period
816
        )
817
    elif mode == "minute_sum":
818
        result = get_minute_sum(
819
            db_config, sensor_config["measure"], sensor_config["hostname"], field_name, period
820
        )
821
    else:
822
        logging.error("Unknown mode: %s", mode)
823
        result = 0.0
824

825
    logging.info(my_lib.pretty.format(result))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc