• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SHAdd0WTAka / Zen-Ai-Pentest / 23093862715

14 Mar 2026 06:33PM UTC coverage: 2.997% (-11.7%) from 14.676%
23093862715

push

github

root
Merge branch 'main' of https://github.com/SHAdd0WTAka/Zen-Ai-Pentest

19 of 7462 branches covered (0.25%)

Branch coverage included in aggregate %.

2075 of 62397 relevant lines covered (3.33%)

0.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/integrations/__init__.py
1
"""
2
CI/CD and External Tool Integrations for Zen AI Pentest
3

4
This module provides integrations with:
5
- GitHub (Actions, Issues, PRs)
6
- GitLab (CI/CD, Issues)
7
- Jenkins (Pipelines)
8
- Slack (Notifications)
9
- JIRA (Issue tracking)
10

11
Usage:
12
    # GitHub Integration
13
    from integrations import GitHubIntegration
14

15
    github = GitHubIntegration(token="ghp_...")
16
    await github.create_security_issue(finding, repo="owner/repo")
17

18
    # Slack Notifications
19
    from integrations import SlackNotifier
20

21
    slack = SlackNotifier(webhook_url="https://hooks.slack.com/...")
22
    await slack.notify_scan_completed(results)
23

24
    # JIRA Integration
25
    from integrations import JiraIntegration
26

27
    jira = JiraIntegration(
28
        server="https://your-domain.atlassian.net",
29
        username="user@example.com",
30
        api_token="..."
31
    )
32
    await jira.create_finding_ticket(finding, project="SEC")
33
"""
34

35
import logging
×
36
from dataclasses import dataclass, field
×
37
from datetime import datetime
×
38
from enum import Enum
×
39
from typing import Any, Dict, List, Optional
×
40

41
logger = logging.getLogger(__name__)
×
42

43

44
class IntegrationStatus(Enum):
×
45
    """Status of an integration."""
46

47
    CONFIGURED = "configured"
×
48
    CONNECTED = "connected"
×
49
    ERROR = "error"
×
50
    DISABLED = "disabled"
×
51

52

53
@dataclass
×
54
class IntegrationConfig:
×
55
    """Configuration for an integration."""
56

57
    name: str
×
58
    enabled: bool = True
×
59
    config: Dict[str, Any] = field(default_factory=dict)
×
60
    status: IntegrationStatus = IntegrationStatus.DISABLED
×
61
    last_error: Optional[str] = None
×
62

63
    def to_dict(self) -> Dict[str, Any]:
×
64
        return {
×
65
            "name": self.name,
66
            "enabled": self.enabled,
67
            "status": self.status.value,
68
            "last_error": self.last_error,
69
        }
70

71

72
# GitHub Integration
73
class GitHubIntegration:
×
74
    """Integration with GitHub for security workflows."""
75

76
    def __init__(
×
77
        self,
78
        token: Optional[str] = None,
79
        base_url: str = "https://api.github.com",
80
    ):
81
        self.token = token
×
82
        self.base_url = base_url.rstrip("/")
×
83
        self.config = IntegrationConfig(name="github", enabled=bool(token))
×
84
        self._session = None
×
85

86
    async def _get_session(self):
×
87
        """Get or create aiohttp session."""
88
        if self._session is None:
×
89
            import aiohttp
×
90

91
            self._session = aiohttp.ClientSession(
×
92
                headers={
93
                    "Authorization": f"token {self.token}",
94
                    "Accept": "application/vnd.github.v3+json",
95
                }
96
            )
97
        return self._session
×
98

99
    async def test_connection(self) -> bool:
×
100
        """Test GitHub API connection."""
101
        if not self.token:
×
102
            return False
×
103

104
        try:
×
105
            session = await self._get_session()
×
106
            async with session.get(f"{self.base_url}/user") as resp:
×
107
                if resp.status == 200:
×
108
                    self.config.status = IntegrationStatus.CONNECTED
×
109
                    return True
×
110
                else:
111
                    self.config.status = IntegrationStatus.ERROR
×
112
                    return False
×
113
        except Exception as e:
×
114
            self.config.status = IntegrationStatus.ERROR
×
115
            self.config.last_error = str(e)
×
116
            logger.error(f"GitHub connection test failed: {e}")
×
117
            return False
×
118

119
    async def create_security_issue(
×
120
        self, finding: Dict[str, Any], repo: str, labels: List[str] = None
121
    ) -> Optional[Dict[str, Any]]:
122
        """Create a security issue in a GitHub repository."""
123
        if not self.token:
×
124
            logger.warning("GitHub token not configured")
×
125
            return None
×
126

127
        session = await self._get_session()
×
128

129
        severity = finding.get("severity", "Medium")
×
130
        title = f"[SECURITY] [{severity}] {finding.get('title', 'Security Finding')}"
×
131

132
        body = f"""## Security Finding Detected
×
133

134
**Severity:** {severity}
135
**Tool:** {finding.get("tool", "Zen AI Pentest")}
136
**Target:** {finding.get("target", "N/A")}
137

138
### Description
139
{finding.get("description", "No description available.")}
140

141
### Remediation
142
{finding.get("remediation", "No remediation steps provided.")}
143

144
---
145
*This issue was automatically created by Zen AI Pentest*
146
"""
147

148
        payload = {
×
149
            "title": title,
150
            "body": body,
151
            "labels": labels
152
            or ["security", "vulnerability", f"severity:{severity.lower()}"],
153
        }
154

155
        try:
×
156
            async with session.post(
×
157
                f"{self.base_url}/repos/{repo}/issues", json=payload
158
            ) as resp:
159
                if resp.status == 201:
×
160
                    return await resp.json()
×
161
                else:
162
                    logger.error(
×
163
                        f"Failed to create GitHub issue: {resp.status}"
164
                    )
165
                    return None
×
166
        except Exception as e:
×
167
            logger.error(f"GitHub API error: {e}")
×
168
            return None
×
169

170
    async def create_check_run(
×
171
        self,
172
        repo: str,
173
        name: str,
174
        head_sha: str,
175
        status: str = "completed",
176
        conclusion: str = "neutral",
177
        output: Dict[str, Any] = None,
178
    ) -> Optional[Dict[str, Any]]:
179
        """Create a check run for security scanning results."""
180
        if not self.token:
×
181
            return None
×
182

183
        session = await self._get_session()
×
184

185
        payload = {
×
186
            "name": name,
187
            "head_sha": head_sha,
188
            "status": status,
189
            "conclusion": conclusion,
190
            "output": output or {},
191
        }
192

193
        try:
×
194
            async with session.post(
×
195
                f"{self.base_url}/repos/{repo}/check-runs", json=payload
196
            ) as resp:
197
                if resp.status == 201:
×
198
                    return await resp.json()
×
199
                return None
×
200
        except Exception as e:
×
201
            logger.error(f"Failed to create check run: {e}")
×
202
            return None
×
203

204

205
# GitLab Integration
206
class GitLabIntegration:
×
207
    """Integration with GitLab CI/CD and Issues."""
208

209
    def __init__(
×
210
        self, token: Optional[str] = None, base_url: str = "https://gitlab.com"
211
    ):
212
        self.token = token
×
213
        self.base_url = base_url.rstrip("/")
×
214
        self.config = IntegrationConfig(name="gitlab", enabled=bool(token))
×
215
        self._session = None
×
216

217
    async def _get_session(self):
×
218
        """Get or create aiohttp session."""
219
        if self._session is None:
×
220
            import aiohttp
×
221

222
            self._session = aiohttp.ClientSession(
×
223
                headers={"PRIVATE-TOKEN": self.token}
224
            )
225
        return self._session
×
226

227
    async def test_connection(self) -> bool:
×
228
        """Test GitLab API connection."""
229
        if not self.token:
×
230
            return False
×
231

232
        try:
×
233
            session = await self._get_session()
×
234
            async with session.get(f"{self.base_url}/api/v4/user") as resp:
×
235
                if resp.status == 200:
×
236
                    self.config.status = IntegrationStatus.CONNECTED
×
237
                    return True
×
238
                return False
×
239
        except Exception as e:
×
240
            logger.error(f"GitLab connection test failed: {e}")
×
241
            return False
×
242

243
    async def create_issue(
×
244
        self,
245
        project_id: str,
246
        finding: Dict[str, Any],
247
        labels: List[str] = None,
248
    ) -> Optional[Dict[str, Any]]:
249
        """Create a security issue in GitLab."""
250
        if not self.token:
×
251
            return None
×
252

253
        session = await self._get_session()
×
254

255
        severity = finding.get("severity", "Medium")
×
256
        title = f"[SECURITY] [{severity}] {finding.get('title', 'Security Finding')}"
×
257

258
        description = f"""## Security Finding
×
259

260
**Severity:** {severity}
261
**Target:** {finding.get("target", "N/A")}
262

263
### Description
264
{finding.get("description", "No description available.")}
265

266
### Remediation
267
{finding.get("remediation", "No remediation steps provided.")}
268

269
---
270
*Generated by Zen AI Pentest*
271
"""
272

273
        payload = {
×
274
            "title": title,
275
            "description": description,
276
            "labels": ",".join(
277
                labels or ["security", f"severity::{severity.lower()}"]
278
            ),
279
        }
280

281
        try:
×
282
            async with session.post(
×
283
                f"{self.base_url}/api/v4/projects/{project_id}/issues",
284
                data=payload,
285
            ) as resp:
286
                if resp.status == 201:
×
287
                    return await resp.json()
×
288
                return None
×
289
        except Exception as e:
×
290
            logger.error(f"Failed to create GitLab issue: {e}")
×
291
            return None
×
292

293

294
# Jenkins Integration
295
class JenkinsIntegration:
×
296
    """Integration with Jenkins CI/CD."""
297

298
    def __init__(
×
299
        self,
300
        url: str,
301
        username: Optional[str] = None,
302
        api_token: Optional[str] = None,
303
    ):
304
        self.url = url.rstrip("/")
×
305
        self.username = username
×
306
        self.api_token = api_token
×
307
        self.config = IntegrationConfig(
×
308
            name="jenkins", enabled=bool(username and api_token)
309
        )
310

311
    async def trigger_job(
×
312
        self, job_name: str, parameters: Dict[str, Any] = None
313
    ) -> bool:
314
        """Trigger a Jenkins job."""
315
        import aiohttp
×
316

317
        auth = aiohttp.BasicAuth(self.username, self.api_token)
×
318

319
        async with aiohttp.ClientSession() as session:
×
320
            try:
×
321
                url = f"{self.url}/job/{job_name}/build"
×
322
                if parameters:
×
323
                    url += "WithParameters"
×
324

325
                async with session.post(
×
326
                    url, auth=auth, data=parameters or {}
327
                ) as resp:
328
                    return resp.status == 201
×
329
            except Exception as e:
×
330
                logger.error(f"Failed to trigger Jenkins job: {e}")
×
331
                return False
×
332

333

334
# Slack Notifier
335
class SlackNotifier:
×
336
    """Slack notification integration."""
337

338
    def __init__(
×
339
        self, webhook_url: Optional[str] = None, channel: Optional[str] = None
340
    ):
341
        self.webhook_url = webhook_url
×
342
        self.channel = channel
×
343
        self.config = IntegrationConfig(
×
344
            name="slack", enabled=bool(webhook_url)
345
        )
346

347
    async def notify_scan_started(
×
348
        self, target: str, scan_type: str = "security"
349
    ):
350
        """Notify that a scan has started."""
351
        message = {
×
352
            "text": "🔍 Security scan started",
353
            "blocks": [
354
                {
355
                    "type": "header",
356
                    "text": {
357
                        "type": "plain_text",
358
                        "text": "🔍 Zen AI Pentest - Scan Started",
359
                    },
360
                },
361
                {
362
                    "type": "section",
363
                    "fields": [
364
                        {"type": "mrkdwn", "text": f"*Target:*\n{target}"},
365
                        {"type": "mrkdwn", "text": f"*Type:*\n{scan_type}"},
366
                        {
367
                            "type": "mrkdwn",
368
                            "text": f"*Time:*\n{datetime.now().isoformat()}",
369
                        },
370
                    ],
371
                },
372
            ],
373
        }
374

375
        if self.channel:
×
376
            message["channel"] = self.channel
×
377

378
        await self._send(message)
×
379

380
    async def notify_scan_completed(
×
381
        self, results: Dict[str, Any], target: str
382
    ):
383
        """Notify that a scan has completed."""
384
        findings = results.get("findings", [])
×
385
        severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0}
×
386

387
        for finding in findings:
×
388
            sev = finding.get("severity", "unknown").lower()
×
389
            if sev in severity_counts:
×
390
                severity_counts[sev] += 1
×
391

392
        message = {
×
393
            "text": f"✅ Security scan completed for {target}",
394
            "blocks": [
395
                {
396
                    "type": "header",
397
                    "text": {
398
                        "type": "plain_text",
399
                        "text": "✅ Zen AI Pentest - Scan Completed",
400
                    },
401
                },
402
                {
403
                    "type": "section",
404
                    "fields": [
405
                        {"type": "mrkdwn", "text": f"*Target:*\n{target}"},
406
                        {
407
                            "type": "mrkdwn",
408
                            "text": f"*Total Findings:*\n{len(findings)}",
409
                        },
410
                        {
411
                            "type": "mrkdwn",
412
                            "text": f"*Critical:*\n{severity_counts['critical']}",
413
                        },
414
                        {
415
                            "type": "mrkdwn",
416
                            "text": f"*High:*\n{severity_counts['high']}",
417
                        },
418
                    ],
419
                },
420
            ],
421
        }
422

423
        if severity_counts["critical"] > 0:
×
424
            message["blocks"].append(
×
425
                {
426
                    "type": "section",
427
                    "text": {
428
                        "type": "mrkdwn",
429
                        "text": f"🚨 *Action Required:* {severity_counts['critical']} critical vulnerabilities found!",
430
                    },
431
                }
432
            )
433

434
        await self._send(message)
×
435

436
    async def notify_finding(self, finding: Dict[str, Any]):
×
437
        """Notify about a specific finding."""
438
        severity = finding.get("severity", "Unknown")
×
439
        emoji = {
×
440
            "Critical": "🚨",
441
            "High": "⚠️",
442
            "Medium": "⚡",
443
            "Low": "ℹ️",
444
        }.get(severity, "ℹ️")
445

446
        message = {
×
447
            "text": f"{emoji} Security Finding: {finding.get('title', 'Unknown')}",
448
            "blocks": [
449
                {
450
                    "type": "header",
451
                    "text": {
452
                        "type": "plain_text",
453
                        "text": f"{emoji} Security Finding Detected",
454
                    },
455
                },
456
                {
457
                    "type": "section",
458
                    "fields": [
459
                        {
460
                            "type": "mrkdwn",
461
                            "text": f"*Title:*\n{finding.get('title', 'N/A')}",
462
                        },
463
                        {"type": "mrkdwn", "text": f"*Severity:*\n{severity}"},
464
                        {
465
                            "type": "mrkdwn",
466
                            "text": f"*Target:*\n{finding.get('target', 'N/A')}",
467
                        },
468
                    ],
469
                },
470
                {
471
                    "type": "section",
472
                    "text": {
473
                        "type": "mrkdwn",
474
                        "text": f"*Description:*\n{finding.get('description', 'No description')[:500]}",
475
                    },
476
                },
477
            ],
478
        }
479

480
        await self._send(message)
×
481

482
    async def _send(self, message: Dict[str, Any]):
×
483
        """Send message to Slack webhook."""
484
        if not self.webhook_url:
×
485
            logger.debug("Slack webhook not configured, skipping notification")
486
            return
×
487

488
        import aiohttp
×
489

490
        async with aiohttp.ClientSession() as session:
×
491
            try:
×
492
                async with session.post(
×
493
                    self.webhook_url, json=message
494
                ) as resp:
495
                    if resp.status != 200:
×
496
                        logger.error(
×
497
                            f"Slack notification failed: {resp.status}"
498
                        )
499
            except Exception as e:
×
500
                logger.error(f"Failed to send Slack notification: {e}")
×
501

502

503
# JIRA Integration (wrapper around existing jira_client)
504
class JiraIntegration:
×
505
    """Integration with JIRA for issue tracking."""
506

507
    def __init__(
×
508
        self,
509
        server: str,
510
        username: Optional[str] = None,
511
        api_token: Optional[str] = None,
512
    ):
513
        self.server = server.rstrip("/")
×
514
        self.username = username
×
515
        self.api_token = api_token
×
516
        self.config = IntegrationConfig(
×
517
            name="jira", enabled=bool(username and api_token)
518
        )
519
        self._client = None
×
520

521
    def _get_client(self):
×
522
        """Get or create JIRA client."""
523
        if self._client is None and self.config.enabled:
×
524
            try:
×
525
                from .jira_client import JiraClient
×
526

527
                self._client = JiraClient(
×
528
                    base_url=self.server,
529
                    username=self.username,
530
                    api_token=self.api_token,
531
                )
532
            except Exception as e:
×
533
                logger.error(f"Failed to create JIRA client: {e}")
×
534
        return self._client
×
535

536
    async def test_connection(self) -> bool:
×
537
        """Test JIRA connection."""
538
        client = self._get_client()
×
539
        if not client:
×
540
            return False
×
541

542
        try:
×
543
            result = client.test_connection()
×
544
            self.config.status = (
×
545
                IntegrationStatus.CONNECTED
546
                if result
547
                else IntegrationStatus.ERROR
548
            )
549
            return result
×
550
        except Exception as e:
×
551
            self.config.status = IntegrationStatus.ERROR
×
552
            self.config.last_error = str(e)
×
553
            return False
×
554

555
    async def create_finding_ticket(
×
556
        self, finding: Dict[str, Any], project_key: str
557
    ) -> Optional[Dict[str, Any]]:
558
        """Create a ticket from a security finding."""
559
        client = self._get_client()
×
560
        if not client:
×
561
            logger.warning("JIRA not configured")
×
562
            return None
×
563

564
        try:
×
565
            return client.create_finding_ticket(finding, project_key)
×
566
        except Exception as e:
×
567
            logger.error(f"Failed to create JIRA ticket: {e}")
×
568
            return None
×
569

570

571
# Factory functions
572
def create_github_integration(config: Dict[str, Any]) -> GitHubIntegration:
×
573
    """Create GitHub integration from config."""
574
    return GitHubIntegration(
×
575
        token=config.get("token"),
576
        base_url=config.get("base_url", "https://api.github.com"),
577
    )
578

579

580
def create_gitlab_integration(config: Dict[str, Any]) -> GitLabIntegration:
×
581
    """Create GitLab integration from config."""
582
    return GitLabIntegration(
×
583
        token=config.get("token"),
584
        base_url=config.get("base_url", "https://gitlab.com"),
585
    )
586

587

588
def create_slack_notifier(config: Dict[str, Any]) -> SlackNotifier:
×
589
    """Create Slack notifier from config."""
590
    return SlackNotifier(
×
591
        webhook_url=config.get("webhook_url"), channel=config.get("channel")
592
    )
593

594

595
def create_jira_integration(config: Dict[str, Any]) -> JiraIntegration:
×
596
    """Create JIRA integration from config."""
597
    return JiraIntegration(
×
598
        server=config.get("server"),
599
        username=config.get("username"),
600
        api_token=config.get("api_token"),
601
    )
602

603

604
def load_integrations_from_config(
×
605
    config_path: str = "config/integrations.json",
606
) -> Dict[str, Any]:
607
    """Load all integrations from configuration file."""
608
    import json
×
609
    import os
×
610

611
    integrations = {}
×
612

613
    if not os.path.exists(config_path):
×
614
        logger.warning(f"Integration config not found: {config_path}")
×
615
        return integrations
×
616

617
    try:
×
618
        with open(config_path, "r") as f:
×
619
            config = json.load(f)
×
620

621
        if config.get("github", {}).get("enabled"):
×
622
            integrations["github"] = create_github_integration(
×
623
                config["github"]
624
            )
625

626
        if config.get("gitlab", {}).get("enabled"):
×
627
            integrations["gitlab"] = create_gitlab_integration(
×
628
                config["gitlab"]
629
            )
630

631
        if config.get("slack", {}).get("enabled"):
×
632
            integrations["slack"] = create_slack_notifier(config["slack"])
×
633

634
        if config.get("jira", {}).get("enabled"):
×
635
            integrations["jira"] = create_jira_integration(config["jira"])
×
636

637
        logger.info(f"Loaded {len(integrations)} integrations")
×
638

639
    except Exception as e:
×
640
        logger.error(f"Failed to load integrations: {e}")
×
641

642
    return integrations
×
643

644

645
__all__ = [
×
646
    # Integration classes
647
    "GitHubIntegration",
648
    "GitLabIntegration",
649
    "JenkinsIntegration",
650
    "SlackNotifier",
651
    "JiraIntegration",
652
    # Configuration
653
    "IntegrationConfig",
654
    "IntegrationStatus",
655
    # Factory functions
656
    "create_github_integration",
657
    "create_gitlab_integration",
658
    "create_slack_notifier",
659
    "create_jira_integration",
660
    "load_integrations_from_config",
661
]
662

663
__version__ = "2.0.0"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc