• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

joagonzalez / cyclops-devops-agent / 15571014953

10 Jun 2025 09:48PM UTC coverage: 75.613% (-10.8%) from 86.441%
15571014953

push

github

web-flow
Merge pull request #49 from joagonzalez/rc-v0.1.0

Rc v0.1.0

110 of 230 new or added lines in 11 files covered. (47.83%)

617 of 816 relevant lines covered (75.61%)

0.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.33
/src/tools/promql_executor_tool.py
1
import asyncio
1✔
2
from typing import List, Dict, Any, Optional
1✔
3
from src.services.prometheus import PrometheusClient
1✔
4

5

6
class PromQLExecutorTool:
1✔
7
    def __init__(self, prometheus_client: PrometheusClient) -> None:
1✔
8
        self.prometheus_client = prometheus_client
1✔
9

10
    async def __call__(self, state: Dict[str, Any]) -> Dict[str, Any]:
1✔
11
        """
12
        Executes a PromQL query.
13

14
        Expected input_data:
15
            {
16
                "promql": "your promql query string"
17
            }
18

19
        Returns:
20
            {
21
                "status": "OK" or "Error",
22
                "promql_result": { ... }  # Prometheus API response
23
            }
24
        """
NEW
25
        queries = state.get("promql_list", [])
×
26
        
NEW
27
        if not isinstance(queries, list) or not queries:
×
NEW
28
            print("[PromQLExecutorTool] No valid 'promql_list' found in state.")
×
NEW
29
            state["promql_results"] = []
×
NEW
30
            state["promql_status"] = "Error: no queries provided"
×
NEW
31
            return state
×
32

NEW
33
        results = []
×
34

NEW
35
        for query in queries:
×
NEW
36
            if not isinstance(query, str) or not query.strip():
×
NEW
37
                results.append({
×
38
                    "query": query,
39
                    "status": "Error",
40
                    "error": "Invalid or empty query"
41
                })
NEW
42
                continue
×
43

NEW
44
            result = await self.prometheus_client.query(query)
×
NEW
45
            print(f"[PromQLExecutorTool] {query} → {result.get('status')}")
×
46
            
NEW
47
            value = None
×
NEW
48
            if result.get("status") == "OK":
×
NEW
49
                try:
×
NEW
50
                    value = result["data"]["data"]["result"][0]["value"][1]
×
NEW
51
                except (KeyError, IndexError, TypeError):
×
NEW
52
                    pass
×
53

54

NEW
55
            results.append({
×
56
                "query": query,
57
                "raw_result": result,
58
                "parsed_result": result.get("data", {}),
59
                "value": value,
60
                "status": result.get("status", "Error")
61
            })
62

63

NEW
64
        state["promql_results"] = results
×
NEW
65
        state["promql_status"] = (
×
66
            "OK" if all(r["status"] == "OK" for r in results) else "Partial/Error"
67
        )
68

NEW
69
        return state
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc