• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SHAdd0WTAka / Zen-Ai-Pentest / 22777105964

06 Mar 2026 06:43PM UTC coverage: 9.997% (-0.003%) from 10.0%
22777105964

push

github

web-flow
Stabilize full-suite test collection and auth refresh flow (#195)

2 of 15 new or added lines in 3 files covered. (13.33%)

1 existing line in 1 file now uncovered.

2655 of 26558 relevant lines covered (10.0%)

0.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.87
/database/models.py
1
"""
2
SQLAlchemy Database Models - Optimized with Indexes
3

4
PostgreSQL Database Schema für Zen-AI-Pentest.
5
Performance optimizations:
6
- Strategic indexes for common queries
7
- Connection pooling configuration
8
- Query optimization hints
9
"""
10

11
import enum
2✔
12
import os
2✔
13
from datetime import datetime, timezone
2✔
14

15
from sqlalchemy import (
2✔
16
    JSON,
17
    Column,
18
    DateTime,
19
    Float,
20
    ForeignKey,
21
    Index,
22
    Integer,
23
    String,
24
    Text,
25
    create_engine,
26
)
27
from sqlalchemy.ext.declarative import declarative_base
2✔
28
from sqlalchemy.orm import relationship, sessionmaker
2✔
29

30
Base = declarative_base()
2✔
31

32
# ============================================================================
33
# ENUMS
34
# ============================================================================
35

36

37
class ScanStatus(str, enum.Enum):
2✔
38
    PENDING = "pending"
2✔
39
    RUNNING = "running"
2✔
40
    COMPLETED = "completed"
2✔
41
    FAILED = "failed"
2✔
42
    CANCELLED = "cancelled"
2✔
43

44

45
class Severity(str, enum.Enum):
2✔
46
    CRITICAL = "critical"
2✔
47
    HIGH = "high"
2✔
48
    MEDIUM = "medium"
2✔
49
    LOW = "low"
2✔
50
    INFO = "info"
2✔
51

52

53
class ReportFormat(str, enum.Enum):
2✔
54
    PDF = "pdf"
2✔
55
    HTML = "html"
2✔
56
    JSON = "json"
2✔
57
    XML = "xml"
2✔
58

59

60
class ReportStatus(str, enum.Enum):
2✔
61
    PENDING = "pending"
2✔
62
    GENERATING = "generating"
2✔
63
    COMPLETED = "completed"
2✔
64
    FAILED = "failed"
2✔
65

66

67
# ============================================================================
68
# MODELS
69
# ============================================================================
70

71

72
class User(Base):
2✔
73
    """Benutzer-Account with optimized indexes"""
74

75
    __tablename__ = "users"
2✔
76

77
    id = Column(Integer, primary_key=True, index=True)
2✔
78
    username = Column(String(100), unique=True, index=True, nullable=False)
2✔
79
    email = Column(String(255), unique=True, index=True, nullable=False)
2✔
80
    hashed_password = Column(String(255), nullable=False)
2✔
81
    role = Column(String(50), default="operator", index=True)
2✔
82
    is_active = Column(Integer, default=1, index=True)
2✔
83
    created_at = Column(DateTime, default=datetime.utcnow, index=True)
2✔
84
    updated_at = Column(
2✔
85
        DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
86
    )
87

88
    # Relationships
89
    scans = relationship("Scan", back_populates="user", lazy="dynamic")
2✔
90
    reports = relationship("Report", back_populates="user", lazy="dynamic")
2✔
91

92
    # Composite index for common lookup pattern
93
    __table_args__ = (Index("ix_users_role_active", "role", "is_active"),)
2✔
94

95

96
class Scan(Base):
2✔
97
    """Pentest Scan with performance indexes"""
98

99
    __tablename__ = "scans"
2✔
100

101
    id = Column(Integer, primary_key=True, index=True)
2✔
102
    name = Column(String(255), nullable=False, index=True)
2✔
103
    target = Column(String(500), nullable=False, index=True)
2✔
104
    scan_type = Column(String(100), nullable=False, index=True)
2✔
105
    status = Column(String(50), default=ScanStatus.PENDING, index=True)
2✔
106
    config = Column(JSON, default={})
2✔
107
    result_summary = Column(Text)
2✔
108
    user_id = Column(Integer, ForeignKey("users.id"), index=True)
2✔
109
    created_at = Column(DateTime, default=datetime.utcnow, index=True)
2✔
110
    started_at = Column(DateTime, index=True)
2✔
111
    completed_at = Column(DateTime, index=True)
2✔
112

113
    # Relationships
114
    user = relationship("User", back_populates="scans")
2✔
115
    findings = relationship(
2✔
116
        "Finding",
117
        back_populates="scan",
118
        cascade="all, delete-orphan",
119
        lazy="dynamic",
120
    )
121
    reports = relationship("Report", back_populates="scan", lazy="dynamic")
2✔
122

123
    # Composite indexes for common query patterns
124
    __table_args__ = (
2✔
125
        # For filtering by status and date range
126
        Index("ix_scans_status_created", "status", "created_at"),
127
        # For user scans sorted by date
128
        Index("ix_scans_user_created", "user_id", "created_at"),
129
        # For target-based lookups
130
        Index("ix_scans_target_type", "target", "scan_type"),
131
    )
132

133

134
class Finding(Base):
2✔
135
    """Sicherheits-Befund with optimized indexes"""
136

137
    __tablename__ = "findings"
2✔
138

139
    id = Column(Integer, primary_key=True, index=True)
2✔
140
    scan_id = Column(
2✔
141
        Integer, ForeignKey("scans.id"), nullable=False, index=True
142
    )
143
    title = Column(String(500), nullable=False)
2✔
144
    description = Column(Text)
2✔
145
    severity = Column(String(50), default=Severity.MEDIUM, index=True)
2✔
146
    cvss_score = Column(Float, index=True)
2✔
147
    cve_id = Column(String(100), index=True)
2✔
148
    evidence = Column(Text)
2✔
149
    remediation = Column(Text)
2✔
150
    tool = Column(String(100), index=True)
2✔
151
    target = Column(String(500), index=True)
2✔
152
    port = Column(Integer, index=True)
2✔
153
    service = Column(String(100))
2✔
154
    created_at = Column(DateTime, default=datetime.utcnow, index=True)
2✔
155
    verified = Column(Integer, default=0, index=True)
2✔
156

157
    # Relationships
158
    scan = relationship("Scan", back_populates="findings")
2✔
159

160
    # Composite indexes for efficient querying
161
    __table_args__ = (
2✔
162
        # For severity filtering within a scan
163
        Index("ix_findings_scan_severity", "scan_id", "severity"),
164
        # For verified findings by severity
165
        Index("ix_findings_severity_verified", "severity", "verified"),
166
        # For tool-based analysis
167
        Index("ix_findings_tool_created", "tool", "created_at"),
168
        # For target-based searches
169
        Index("ix_findings_target_severity", "target", "severity"),
170
    )
171

172

173
class Report(Base):
2✔
174
    """Generierter Bericht"""
175

176
    __tablename__ = "reports"
2✔
177

178
    id = Column(Integer, primary_key=True, index=True)
2✔
179
    scan_id = Column(Integer, ForeignKey("scans.id"), index=True)
2✔
180
    user_id = Column(Integer, ForeignKey("users.id"), index=True)
2✔
181
    format = Column(String(50), default=ReportFormat.PDF)
2✔
182
    template = Column(String(100))
2✔
183
    status = Column(String(50), default=ReportStatus.PENDING, index=True)
2✔
184
    file_path = Column(String(1000))
2✔
185
    file_size = Column(Integer)
2✔
186
    created_at = Column(DateTime, default=datetime.utcnow, index=True)
2✔
187
    generated_at = Column(DateTime)
2✔
188

189
    # Relationships
190
    scan = relationship("Scan", back_populates="reports")
2✔
191
    user = relationship("User", back_populates="reports")
2✔
192

193
    __table_args__ = (
2✔
194
        Index("ix_reports_status_created", "status", "created_at"),
195
    )
196

197

198
class VulnerabilityDB(Base):
2✔
199
    """Lokale CVE/Exploit-Datenbank with full-text search support"""
200

201
    __tablename__ = "vulnerabilities"
2✔
202

203
    id = Column(Integer, primary_key=True, index=True)
2✔
204
    cve_id = Column(String(100), unique=True, index=True, nullable=False)
2✔
205
    title = Column(String(500))
2✔
206
    description = Column(Text)
2✔
207
    severity = Column(String(50), index=True)
2✔
208
    cvss_score = Column(Float, index=True)
2✔
209
    epss_score = Column(Float, index=True)
2✔
210
    affected_products = Column(JSON)
2✔
211
    references = Column(JSON)
2✔
212
    exploits = Column(JSON)
2✔
213
    updated_at = Column(DateTime, default=datetime.utcnow, index=True)
2✔
214

215
    __table_args__ = (
2✔
216
        Index("ix_vulns_severity_cvss", "severity", "cvss_score"),
217
        Index("ix_vulns_cve_updated", "cve_id", "updated_at"),
218
    )
219

220

221
class Asset(Base):
2✔
222
    """Asset-Management with geographic and network indexes"""
223

224
    __tablename__ = "assets"
2✔
225

226
    id = Column(Integer, primary_key=True, index=True)
2✔
227
    name = Column(String(255), nullable=False, index=True)
2✔
228
    asset_type = Column(String(100), index=True)
2✔
229
    ip_address = Column(String(50), index=True)
2✔
230
    hostname = Column(String(255), index=True)
2✔
231
    os = Column(String(100))
2✔
232
    services = Column(JSON)
2✔
233
    owner = Column(String(255), index=True)
2✔
234
    criticality = Column(String(50), default="medium", index=True)
2✔
235
    created_at = Column(DateTime, default=datetime.utcnow, index=True)
2✔
236
    last_scanned = Column(DateTime, index=True)
2✔
237

238
    __table_args__ = (
2✔
239
        Index("ix_assets_type_criticality", "asset_type", "criticality"),
240
        Index("ix_assets_owner_scanned", "owner", "last_scanned"),
241
    )
242

243

244
class AuditLog(Base):
2✔
245
    """Audit-Logging für Compliance with time-series indexing"""
246

247
    __tablename__ = "audit_logs"
2✔
248

249
    id = Column(Integer, primary_key=True, index=True)
2✔
250
    user_id = Column(Integer, ForeignKey("users.id"), index=True)
2✔
251
    action = Column(String(100), nullable=False, index=True)
2✔
252
    resource_type = Column(String(100), index=True)
2✔
253
    resource_id = Column(Integer, index=True)
2✔
254
    details = Column(JSON)
2✔
255
    ip_address = Column(String(50))
2✔
256
    user_agent = Column(String(500))
2✔
257
    timestamp = Column(DateTime, default=datetime.utcnow, index=True)
2✔
258

259
    __table_args__ = (
2✔
260
        # For time-based audit queries
261
        Index("ix_audit_user_timestamp", "user_id", "timestamp"),
262
        Index("ix_audit_action_timestamp", "action", "timestamp"),
263
        Index("ix_audit_resource", "resource_type", "resource_id"),
264
    )
265

266

267
class Notification(Base):
2✔
268
    """Benachrichtigungen with read status indexing"""
269

270
    __tablename__ = "notifications"
2✔
271

272
    id = Column(Integer, primary_key=True, index=True)
2✔
273
    user_id = Column(Integer, ForeignKey("users.id"), index=True)
2✔
274
    type = Column(String(50), index=True)
2✔
275
    title = Column(String(255))
2✔
276
    message = Column(Text)
2✔
277
    read = Column(Integer, default=0, index=True)
2✔
278
    created_at = Column(DateTime, default=datetime.utcnow, index=True)
2✔
279

280
    __table_args__ = (
2✔
281
        # For unread notifications per user
282
        Index("ix_notifications_user_read", "user_id", "read", "created_at"),
283
    )
284

285

286
class ToolConfig(Base):
2✔
287
    """Tool-Konfigurationen pro User/Team"""
288

289
    __tablename__ = "tool_configs"
2✔
290

291
    id = Column(Integer, primary_key=True, index=True)
2✔
292
    user_id = Column(Integer, ForeignKey("users.id"), index=True)
2✔
293
    tool_name = Column(String(100), nullable=False, index=True)
2✔
294
    config = Column(JSON, default={})
2✔
295
    created_at = Column(DateTime, default=datetime.utcnow, index=True)
2✔
296
    updated_at = Column(
2✔
297
        DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
298
    )
299

300
    __table_args__ = (
2✔
301
        Index("ix_toolconfig_user_tool", "user_id", "tool_name"),
302
    )
303

304

305
# ============================================================================
306
# DATABASE CONNECTION - OPTIMIZED
307
# ============================================================================
308

309
# PostgreSQL URL - in production aus Umgebungsvariablen
310
DATABASE_URL = os.getenv(
2✔
311
    "DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/zen_pentest"
312
)
313

314
# =============================================================================
315
# Connection Pool Configuration - Optimized
316
# =============================================================================
317
POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "10"))
2✔
318
MAX_OVERFLOW = int(os.getenv("DB_MAX_OVERFLOW", "20"))
2✔
319
POOL_TIMEOUT = int(os.getenv("DB_POOL_TIMEOUT", "30"))
2✔
320
POOL_RECYCLE = int(os.getenv("DB_POOL_RECYCLE", "3600"))
2✔
321
POOL_PRE_PING = os.getenv("DB_POOL_PRE_PING", "true").lower() == "true"
2✔
322

323
# Connection Pool arguments - Optimized for production
324
engine_args = {
2✔
325
    "pool_size": POOL_SIZE,
326
    "max_overflow": MAX_OVERFLOW,
327
    "pool_timeout": POOL_TIMEOUT,
328
    "pool_recycle": POOL_RECYCLE,
329
    "pool_pre_ping": POOL_PRE_PING,
330
    "pool_use_lifo": True,  # LIFO for better cache locality
331
    "echo": os.getenv("DB_ECHO", "false").lower() == "true",
332
}
333

334
# Connection arguments for PostgreSQL
335
connect_args = {
2✔
336
    "connect_timeout": 10,
337
    "application_name": "zen-ai-pentest",
338
}
339

340
try:
2✔
341
    if DATABASE_URL.startswith("sqlite"):
2✔
342
        engine = create_engine(
2✔
343
            DATABASE_URL,
344
            connect_args={"check_same_thread": False},
345
            **{
346
                k: v
347
                for k, v in engine_args.items()
348
                if k not in ["pool_use_lifo", "pool_pre_ping"]
349
            },
350
        )
351
    else:
352
        # Try PostgreSQL with optimized settings
NEW
353
        print(
×
354
            f"Connecting to PostgreSQL with pool_size={POOL_SIZE}, max_overflow={MAX_OVERFLOW}"
355
        )
NEW
356
        engine = create_engine(
×
357
            DATABASE_URL,
358
            **engine_args,
359
            connect_args=connect_args,
360
        )
UNCOV
361
except ImportError:
×
362
    # Fallback auf SQLite für lokale Entwicklung/Tests
363
    print(
×
364
        "Warning: PostgreSQL/psycopg2 not available. Using SQLite fallback (zen_pentest.db)"
365
    )
366
    DATABASE_URL = "sqlite:///./zen_pentest.db"
×
367
    engine = create_engine(
×
368
        DATABASE_URL,
369
        connect_args={"check_same_thread": False},
370
        **{
371
            k: v
372
            for k, v in engine_args.items()
373
            if k not in ["pool_use_lifo", "pool_pre_ping"]
374
        },
375
    )
376
except Exception as e:
×
377
    print(
×
378
        f"Warning: PostgreSQL connection failed ({e}). Using SQLite fallback (zen_pentest.db)"
379
    )
380
    DATABASE_URL = "sqlite:///./zen_pentest.db"
×
381
    engine = create_engine(
×
382
        DATABASE_URL,
383
        connect_args={"check_same_thread": False},
384
        **{
385
            k: v
386
            for k, v in engine_args.items()
387
            if k not in ["pool_use_lifo", "pool_pre_ping"]
388
        },
389
    )
390

391
# Optimized session factory
392
SessionLocal = sessionmaker(
2✔
393
    autocommit=False,
394
    autoflush=False,
395
    bind=engine,
396
    expire_on_commit=False,  # Better performance for read-heavy workloads
397
)
398

399

400
def init_db():
2✔
401
    """Initialisiert die Datenbank mit all indexes"""
402
    Base.metadata.create_all(bind=engine)
×
403

404

405
def get_db():
2✔
406
    """Dependency für FastAPI with automatic cleanup"""
407
    db = SessionLocal()
×
408
    try:
×
409
        yield db
×
410
    finally:
411
        db.close()
×
412

413

414
# ============================================================================
415
# OPTIMIZED CRUD OPERATIONS
416
# ============================================================================
417

418

419
def create_scan(
2✔
420
    db, name: str, target: str, scan_type: str, config: dict, user_id: int
421
):
422
    """Erstellt neuen Scan"""
423
    db_scan = Scan(
×
424
        name=name,
425
        target=target,
426
        scan_type=scan_type,
427
        config=config,
428
        user_id=user_id,
429
    )
430
    db.add(db_scan)
×
431
    db.commit()
×
432
    db.refresh(db_scan)
×
433
    return db_scan
×
434

435

436
def get_scan(db, scan_id: int):
2✔
437
    """Holt Scan by ID - uses index on id"""
438
    return db.query(Scan).filter(Scan.id == scan_id).first()
×
439

440

441
def get_scans(db, skip: int = 0, limit: int = 100, status: str = None):
2✔
442
    """Listet Scans auf - uses composite index"""
443
    query = db.query(Scan)
×
444
    if status:
×
445
        query = query.filter(Scan.status == status)
×
446
    # Use the composite index (status, created_at) or (user_id, created_at)
447
    return (
×
448
        query.order_by(Scan.created_at.desc()).offset(skip).limit(limit).all()
449
    )
450

451

452
def get_scans_by_user(db, user_id: int, skip: int = 0, limit: int = 100):
2✔
453
    """Get scans for a specific user - uses composite index"""
454
    return (
×
455
        db.query(Scan)
456
        .filter(Scan.user_id == user_id)
457
        .order_by(Scan.created_at.desc())
458
        .offset(skip)
459
        .limit(limit)
460
        .all()
461
    )
462

463

464
def update_scan_status(db, scan_id: int, status: str, result: dict = None):
2✔
465
    """Aktualisiert Scan-Status"""
466
    scan = db.query(Scan).filter(Scan.id == scan_id).first()
×
467
    if scan:
×
468
        scan.status = status
×
469
        if result:
×
470
            scan.result_summary = str(result)
×
471
        if status == "running":
×
472
            scan.started_at = datetime.now(timezone.utc)
×
473
        if status in ["completed", "failed"]:
×
474
            scan.completed_at = datetime.now(timezone.utc)
×
475
        db.commit()
×
476
        db.refresh(scan)
×
477
    return scan
×
478

479

480
def create_finding(
2✔
481
    db,
482
    scan_id: int,
483
    title: str,
484
    description: str,
485
    severity: str = "medium",
486
    cvss_score: float = None,
487
    evidence: str = None,
488
    tool: str = None,
489
    target: str = None,
490
):
491
    """Erstellt neuen Befund"""
492
    db_finding = Finding(
×
493
        scan_id=scan_id,
494
        title=title,
495
        description=description,
496
        severity=severity,
497
        cvss_score=cvss_score,
498
        evidence=evidence,
499
        tool=tool,
500
        target=target,
501
    )
502
    db.add(db_finding)
×
503
    db.commit()
×
504
    db.refresh(db_finding)
×
505
    return db_finding
×
506

507

508
def bulk_create_findings(db, findings_data: list):
2✔
509
    """Efficiently create multiple findings using bulk insert"""
510
    db.bulk_insert_mappings(Finding, findings_data)
×
511
    db.commit()
×
512

513

514
def get_findings(db, scan_id: int, severity: str = None):
2✔
515
    """Holt Befunde für Scan - uses composite index"""
516
    query = db.query(Finding).filter(Finding.scan_id == scan_id)
×
517
    if severity:
×
518
        query = query.filter(Finding.severity == severity)
×
519
    return query.order_by(Finding.created_at.desc()).all()
×
520

521

522
def get_findings_by_severity(db, severity: str, limit: int = 100):
2✔
523
    """Get findings filtered by severity - uses index"""
524
    return (
×
525
        db.query(Finding)
526
        .filter(Finding.severity == severity)
527
        .order_by(Finding.created_at.desc())
528
        .limit(limit)
529
        .all()
530
    )
531

532

533
def create_report(db, scan_id: int, format: str, template: str, user_id: int):
2✔
534
    """Erstellt Report-Eintrag"""
535
    db_report = Report(
×
536
        scan_id=scan_id,
537
        format=format,
538
        template=template,
539
        user_id=user_id,
540
        status="pending",
541
    )
542
    db.add(db_report)
×
543
    db.commit()
×
544
    db.refresh(db_report)
×
545
    return db_report
×
546

547

548
def get_reports(db, skip: int = 0, limit: int = 100):
2✔
549
    """Listet Reports auf"""
550
    return (
×
551
        db.query(Report)
552
        .order_by(Report.created_at.desc())
553
        .offset(skip)
554
        .limit(limit)
555
        .all()
556
    )
557

558

559
def create_audit_log(
2✔
560
    db,
561
    user_id: int,
562
    action: str,
563
    resource_type: str,
564
    resource_id: int,
565
    details: dict,
566
    ip_address: str = None,
567
):
568
    """Erstellt Audit-Log-Eintrag"""
569
    log = AuditLog(
×
570
        user_id=user_id,
571
        action=action,
572
        resource_type=resource_type,
573
        resource_id=resource_id,
574
        details=details,
575
        ip_address=ip_address,
576
    )
577
    db.add(log)
×
578
    db.commit()
×
579

580

581
# Default Admin User erstellen
582
def create_default_admin():
2✔
583
    """Erstellt default Admin-User"""
584
    db = SessionLocal()
×
585
    try:
×
586
        from passlib.context import CryptContext
×
587

588
        pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
×
589

590
        admin = db.query(User).filter(User.username == "admin").first()
×
591
        if not admin:
×
592
            hashed = pwd_context.hash("admin")
×
593
            admin = User(
×
594
                username="admin",
595
                email="admin@zen-pentest.local",
596
                hashed_password=hashed,
597
                role="admin",
598
            )
599
            db.add(admin)
×
600
            db.commit()
×
601
            print("Default admin user created (admin/admin)")
×
602
    finally:
603
        db.close()
×
604

605

606
if __name__ == "__main__":
2✔
607
    init_db()
×
608
    create_default_admin()
×
609
    print("Database initialized with optimized indexes")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc