• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

binbashar / leverage / 8755878403

19 Apr 2024 03:17PM UTC coverage: 55.875% (-0.6%) from 56.457%
8755878403

push

github

Osvaldo Demo
update parsing logic

226 of 626 branches covered (36.1%)

Branch coverage included in aggregate %.

8 of 42 new or added lines in 1 file covered. (19.05%)

46 existing lines in 1 file now uncovered.

2608 of 4446 relevant lines covered (58.66%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.39
/leverage/checker/version_parser.py
1
import os
1✔
2
import re
1✔
3
from typing import Dict, Any
1✔
4

5
import hcl2
1✔
6

7

8
class VersionExtractor:
1✔
9
    """Extracts versions from parsed Terraform configurations, focusing on accurate source handling based on Terraform documentation."""
10

11
    @staticmethod
1✔
12
    def extract_versions(tf_config: Dict[str, Any]) -> Dict[str, Dict[str, str]]:
1✔
13
        versions = {}
×
14
        VersionExtractor.extract_core_and_providers(tf_config, versions)
×
15
        VersionExtractor.extract_module_versions(tf_config, versions)
×
16
        return versions
×
17

18
    @staticmethod
1✔
19
    def extract_core_and_providers(tf_config: Dict[str, Any], versions: Dict[str, Dict[str, str]]):
1✔
20
        for terraform_block in tf_config.get("terraform", []):
×
21
            if isinstance(terraform_block, dict):
×
22
                if "required_version" in terraform_block:
×
23
                    versions["terraform"] = {"type": "core", "version": terraform_block["required_version"]}
×
24
                if "required_providers" in terraform_block:
×
25
                    VersionExtractor.process_providers(terraform_block["required_providers"], versions)
×
26

27
    @staticmethod
1✔
28
    def process_providers(providers: Any, versions: Dict[str, Dict[str, str]]):
1✔
29
        if isinstance(providers, dict):
×
30
            VersionExtractor.extract_provider_versions(providers, versions)
×
31
        elif isinstance(providers, list):
×
32
            for provider_dict in providers:
×
33
                VersionExtractor.extract_provider_versions(provider_dict, versions)
×
34

35
    @staticmethod
1✔
36
    def extract_provider_versions(providers: Dict[str, Any], versions: Dict[str, Dict[str, str]]):
1✔
37
        for provider, details in providers.items():
×
38
            if isinstance(details, dict) and "version" in details:
×
39
                versions[provider] = {"type": "provider", "version": details["version"]}
×
40
            elif isinstance(details, str):
×
41
                versions[provider] = {"type": "provider", "version": details}
×
42

43
    @staticmethod
1✔
44
    def extract_module_versions(tf_config: Dict[str, Any], versions: Dict[str, Dict[str, str]]):
1✔
45
        for module in tf_config.get("module", []):
×
NEW
46
            if isinstance(module, dict):
×
47
                for name, data in module.items():
×
NEW
48
                    source = data.get("source", "")
×
NEW
49
                    explicit_version = data.get("version", None)
×
NEW
50
                    version_info = VersionExtractor.parse_source(source, explicit_version)
×
NEW
51
                    versions[name] = version_info
×
52

53
    @staticmethod
1✔
54
    def parse_source(source: str, explicit_version: str = None) -> Dict[str, str]:
1✔
55
        # Local path detection
NEW
56
        if source.startswith("./") or source.startswith("../"):
×
NEW
57
            return {"type": "local", "source": source, "version": explicit_version or "N/A"}
×
58
        # GitHub URL detection (SSH or HTTPS)
NEW
59
        elif "github.com" in source:
×
NEW
60
            return VersionExtractor.parse_github_source(source, explicit_version)
×
61
        # Registry source detection
NEW
62
        elif any(x in source for x in ["terraform-registry", "registry.terraform.io"]):
×
NEW
63
            return VersionExtractor.parse_registry_source(source, explicit_version)
×
64
        else:
NEW
65
            return {"type": "other", "source": source, "version": explicit_version or "unknown"}
×
66

67
    @staticmethod
1✔
68
    def parse_github_source(source: str, explicit_version: str) -> Dict[str, str]:
1✔
NEW
69
        ref_match = re.search(r"ref=([vV]?[\w\d\.\-_]+)", source)
×
NEW
70
        version = ref_match.group(1) if ref_match else explicit_version
×
NEW
71
        if version and version.lower().startswith("v"):
×
NEW
72
            version = version[1:]  # Strip the 'v' prefix if present
×
NEW
73
        return {"type": "github", "repository": source.split("?")[0], "version": version or "HEAD"}
×
74

75
    @staticmethod
1✔
76
    def parse_registry_source(source: str, explicit_version: str) -> Dict[str, str]:
1✔
77
        """
78
        Parses Terraform Registry sources with or without a hostname.
79
        Formats:
80
        - <NAMESPACE>/<NAME>/<PROVIDER>
81
        - <HOSTNAME>/<NAMESPACE>/<NAME>/<PROVIDER>
82
        """
NEW
83
        regex = re.compile(r"^(?:(?P<hostname>[\w\.:-]+)/)?(?P<namespace>\w+)/(?P<name>\w+)/(?P<provider>\w+)$")
×
NEW
84
        match = regex.match(source)
×
NEW
85
        if match:
×
NEW
86
            details = match.groupdict()
×
NEW
87
            hostname = details.get("hostname")
×
NEW
88
            namespace = details["namespace"]
×
NEW
89
            name = details["name"]
×
NEW
90
            provider = details["provider"]
×
NEW
91
            module_identifier = f"{namespace}/{name}/{provider}"
×
NEW
92
            if hostname:
×
NEW
93
                module_identifier = f"{hostname}/{module_identifier}"
×
NEW
94
            version_match = re.search(r"ref=([vV]?[\w\d\.]+)", source)
×
NEW
95
            version = version_match.group(1) if version_match else explicit_version
×
NEW
96
            if version and version.lower().startswith("v"):
×
NEW
97
                version = version[1:]  # Strip the 'v' prefix if present
×
NEW
98
            return {
×
99
                "type": "registry",
100
                "module": module_identifier,
101
                "version": version or "latest",
102
                "hostname": hostname or "public",
103
            }
NEW
104
        return {"type": "registry", "source": source, "version": explicit_version or "unknown"}
×
105

106

107
class TerraformFileParser:
1✔
108
    @staticmethod
1✔
109
    def load(file_path: str) -> Dict[str, Any]:
1✔
110
        with open(file_path, "r") as tf_file:
×
111
            return hcl2.load(tf_file)
×
112

113

114
class VersionManager:
1✔
115
    def __init__(self, directory: str):
1✔
116
        self.directory = directory
×
117

118
    def find_versions(self) -> Dict[str, Dict[str, str]]:
1✔
119
        versions = {}
×
120
        for root, _, files in os.walk(self.directory):
×
121
            for file in files:
×
122
                if file.endswith(".tf"):
×
123
                    file_path = os.path.join(root, file)
×
124
                    try:
×
125
                        tf_config = TerraformFileParser.load(file_path)
×
126
                        versions.update(VersionExtractor.extract_versions(tf_config))
×
127
                    except Exception as e:
×
128
                        self.handle_error(e, file_path)
×
129
        return versions
×
130

131
    @staticmethod
1✔
132
    def handle_error(e: Exception, file_path: str):
1✔
133
        print(f"Error processing {file_path}: {e}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc