• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

x4dr / Okysa / 21290232389

23 Jan 2026 02:50PM UTC coverage: 79.043% (+0.08%) from 78.966%
21290232389

push

github

x4dr
nossinames as foreignkey

10 of 19 new or added lines in 3 files covered. (52.63%)

2 existing lines in 1 file now uncovered.

2727 of 3450 relevant lines covered (79.04%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.76
/scripts/install.py
1
#!/usr/bin/env python3
2
import argparse
1✔
3
import getpass
1✔
4
import json
1✔
5
import os
1✔
6
import re
1✔
7
import secrets
1✔
8
import subprocess
1✔
9
from pathlib import Path
1✔
10

11

12
def get_input(prompt, default=None):
1✔
13
    if default:
×
14
        res = input(f"{prompt} [{default}]: ").strip()
×
15
        return res if res else default
×
16
    return input(f"{prompt}: ").strip()
×
17

18

19
def get_current_user():
1✔
20
    try:
×
21
        return os.getlogin()
×
22
    except OSError:
×
23
        return getpass.getuser()
×
24

25

26
def detect_domains():
1✔
27
    domain_map = {}
×
28
    sites_enabled = Path("/etc/nginx/sites-enabled")
×
29
    if not sites_enabled.exists():
×
30
        return domain_map
×
31

32
    for site in sites_enabled.iterdir():
×
33
        if ".bak." in site.name:
×
34
            continue
×
35
        try:
×
36
            content = site.read_text()
×
37
            # Match lines that start with server_name (ignoring comments)
38
            matches = re.findall(r"^\s*server_name\s+([^;]+);", content, re.MULTILINE)
×
39
            for match in matches:
×
40
                for domain in match.split():
×
41
                    if domain and not domain.startswith("_"):
×
42
                        domain_map[domain] = site
×
43
        except Exception:
×
44
            continue
×
45
    return domain_map
×
46

47

48
def cleanup_legacy_backups():
1✔
49
    print("Checking for legacy backup files in Nginx directories...")
×
50
    paths = [Path("/etc/nginx/sites-enabled"), Path("/etc/nginx/sites-available")]
×
51
    for p in paths:
×
52
        if p.exists():
×
53
            for f in p.iterdir():
×
54
                if ".bak." in f.name:
×
55
                    print(f"Removing legacy backup: {f}")
×
56
                    subprocess.run(["sudo", "rm", str(f)], check=True)
×
57

58

59
def inject_nginx_config(filepath, domain, block):
1✔
60
    content = filepath.read_text()
×
61
    if block.strip() in content:
×
62
        print(f"Webhook block already exists in {filepath.name}. Skipping injection.")
×
63
        return True
×
64

65
    # Backup
66
    backup_dir = filepath.parent.parent / "backups"
×
67
    if not backup_dir.exists():
×
68
        subprocess.run(["sudo", "mkdir", "-p", str(backup_dir)], check=True)
×
69

70
    backup = backup_dir / f"{filepath.name}.bak.{secrets.token_hex(4)}"
×
71
    subprocess.run(["sudo", "cp", str(filepath), str(backup)], check=True)
×
72
    print(f"Created Nginx backup: {backup}")
×
73

74
    # Injection logic
75
    # Find the server block for the domain
76
    # We look for server_name domain; and then backtrack to find the closest server { before it
77
    # But a simpler way is to find the server { that contains server_name domain;
78
    # and then find the last } of that block.
79

80
    lines = content.splitlines()
×
81
    in_server_block = False
×
82
    brace_count = 0
×
83

84
    # This is a basic parser that looks for the server block containing the domain
85
    for i, line in enumerate(lines):
×
86
        if "server {" in line:
×
87
            in_server_block = True
×
88
            brace_count = 1
×
89
            continue
×
90

91
        if in_server_block:
×
92
            brace_count += line.count("{")
×
93
            brace_count -= line.count("}")
×
94

95
            if domain in line and "server_name" in line:
×
96
                # Found the right block. Now find where it ends.
97
                # Continue until brace_count is 0
98
                for j in range(i + 1, len(lines)):
×
99
                    brace_count += lines[j].count("{")
×
100
                    brace_count -= lines[j].count("}")
×
101
                    if brace_count == 0:
×
102
                        # Insert before this line
103
                        new_lines = lines[:j] + [block] + lines[j:]
×
104
                        new_content = "\n".join(new_lines)
×
105

106
                        # Write and test
107
                        try:
×
108
                            subprocess.run(
×
109
                                ["sudo", "tee", str(filepath)],
110
                                input=new_content.encode(),
111
                                stdout=subprocess.DEVNULL,
112
                                check=True,
113
                            )
114
                            print("Injected webhook block into existing Nginx config.")
×
115
                            if subprocess.run(["sudo", "nginx", "-t"]).returncode == 0:
×
116
                                return True
×
117
                            else:
118
                                print("Nginx test failed! Rolling back...")
×
119
                                subprocess.run(
×
120
                                    ["sudo", "cp", str(backup), str(filepath)],
121
                                    check=True,
122
                                )
123
                                return False
×
124
                        except Exception as e:
×
125
                            print(f"Error during injection: {e}")
×
126
                            subprocess.run(
×
127
                                ["sudo", "cp", str(backup), str(filepath)], check=True
128
                            )
129
                            return False
×
130

131
            if brace_count == 0:
×
132
                in_server_block = False
×
133

134
    print(f"Could not find a suitable server block for {domain} in {filepath.name}.")
×
135
    return False
×
136

137

138
def get_repo_nwo():
1✔
139
    try:
×
140
        res = subprocess.run(
×
141
            ["gh", "repo", "view", "--json", "nameWithOwner", "-q", ".nameWithOwner"],
142
            capture_output=True,
143
            text=True,
144
            check=True,
145
        )
146
        return res.stdout.strip()
×
147
    except subprocess.CalledProcessError:
×
148
        return None
×
149

150

151
def register_webhook(domain, existing_secret):
1✔
152
    print("\n--- GitHub Webhook Registration ---")
×
153
    use_gh = get_input("Use 'gh' CLI to register/update webhook?", "y")
×
154
    if use_gh.lower() != "y":
×
155
        return None
×
156

157
    if subprocess.run(["which", "gh"], capture_output=True).returncode != 0:
×
158
        print("Error: 'gh' CLI not found.")
×
159
        return None
×
160

161
    # Check GH auth status
162
    auth_check = subprocess.run(["gh", "auth", "status"], capture_output=True)
×
163
    if auth_check.returncode != 0:
×
164
        print("GitHub CLI is not authenticated.")
×
165
        if get_input("Run 'gh auth login' now?", "y").lower() == "y":
×
166
            try:
×
167
                subprocess.run(["gh", "auth", "login"], check=True)
×
168
            except subprocess.CalledProcessError:
×
169
                return None
×
170
        else:
171
            return None
×
172

173
    nwo = get_repo_nwo()
×
174
    if not nwo:
×
175
        print("Could not determine repository name.")
×
176
        return None
×
177

178
    webhook_url = f"https://{domain}/webhook"
×
179

180
    # Generate new secret if needed
181
    secret = existing_secret or secrets.token_urlsafe(32)
×
182

183
    # Check for existing webhook
184
    try:
×
185
        hooks_json = subprocess.run(
×
186
            ["gh", "api", f"repos/{nwo}/hooks"],
187
            capture_output=True,
188
            text=True,
189
            check=True,
190
        ).stdout
191
        hooks = json.loads(hooks_json)
×
192
        existing_hook = next(
×
193
            (h for h in hooks if h.get("config", {}).get("url") == webhook_url), None
194
        )
195

196
        hook_data = {
×
197
            "active": True,
198
            "events": ["workflow_run"],
199
            "config": {
200
                "url": webhook_url,
201
                "content_type": "json",
202
                "secret": secret,
203
            },
204
        }
205

206
        if existing_hook:
×
207
            print(f"Found existing webhook (ID: {existing_hook['id']}). Updating...")
×
208
            endpoint = f"repos/{nwo}/hooks/{existing_hook['id']}"
×
209
            method = "PATCH"
×
210
        else:
211
            print("Creating new webhook...")
×
212
            hook_data["name"] = "web"
×
213
            endpoint = f"repos/{nwo}/hooks"
×
214
            method = "POST"
×
215

216
        subprocess.run(
×
217
            ["gh", "api", endpoint, "--method", method, "--input", "-"],
218
            input=json.dumps(hook_data),
219
            text=True,
220
            check=True,
221
        )
222
        print(f"Successfully registered/updated webhook: {webhook_url}")
×
223
        return secret
×
224
    except Exception as e:
×
225
        print(f"Failed to manage webhook: {e}")
×
226
        return None
×
227

228

229
def uninstall(okysa_root):
1✔
230
    print("\n--- Uninstalling Okysa Deployment ---")
×
231

232
    # Stop and disable services
233
    services = ["okysa.service", "okysa-webhook.service"]
×
234
    for svc in services:
×
235
        print(f"Stopping and disabling {svc}...")
×
236
        subprocess.run(["sudo", "systemctl", "stop", svc], stderr=subprocess.DEVNULL)
×
237
        subprocess.run(["sudo", "systemctl", "disable", svc], stderr=subprocess.DEVNULL)
×
238
        subprocess.run(
×
239
            ["sudo", "rm", f"/etc/systemd/system/{svc}"], stderr=subprocess.DEVNULL
240
        )
241

242
    # Nginx
243
    sites_available = Path("/etc/nginx/sites-available")
×
244
    if sites_available.exists():
×
245
        for site in sites_available.iterdir():
×
246
            try:
×
247
                content = site.read_text()
×
248
                if "# Managed by Okysa Installer" in content:
×
249
                    print(f"Removing Nginx config: {site.name}")
×
250
                    subprocess.run(
×
251
                        ["sudo", "rm", f"/etc/nginx/sites-enabled/{site.name}"],
252
                        stderr=subprocess.DEVNULL,
253
                    )
254
                    subprocess.run(["sudo", "rm", str(site)], stderr=subprocess.DEVNULL)
×
255
            except:
×
256
                continue
×
257

258
    # Sudoers
259
    print("Removing sudoers entry...")
×
260
    subprocess.run(
×
261
        ["sudo", "rm", "/etc/sudoers.d/okysa-deploy"], stderr=subprocess.DEVNULL
262
    )
263

264
    subprocess.run(["sudo", "systemctl", "daemon-reload"])
×
265
    print("\nUninstallation complete.")
×
266

267

268
def load_env(env_path):
1✔
269
    env_vars = {}
×
270
    if env_path.exists():
×
271
        with open(env_path, "r") as f:
×
272
            for line in f:
×
273
                line = line.strip()
×
274
                if "=" in line and not line.startswith("#"):
×
275
                    k, v = line.split("=", 1)
×
276
                    env_vars[k.strip()] = v.strip().strip('"').strip("'")
×
277
    return env_vars
×
278

279

280
def save_env(env_path, env_vars):
1✔
281
    with open(env_path, "w") as f:
×
282
        for k, v in sorted(env_vars.items()):
×
283
            f.write(f'{k}="{v}"\n')
×
284

285

286
def main():
1✔
287
    parser = argparse.ArgumentParser(description="Okysa Deployment Installer")
×
288
    parser.add_argument(
×
289
        "--uninstall", action="store_true", help="Uninstall the deployment"
290
    )
291
    args = parser.parse_args()
×
292

293
    script_path = Path(__file__).resolve()
×
294
    okysa_root = script_path.parent.parent
×
295
    env_path = okysa_root / ".env"
×
296

UNCOV
297
    if args.uninstall:
×
298
        uninstall(okysa_root)
×
299
        return
×
300

301
    gamepack_root = okysa_root.parent / "GamePack"
×
302
    uv_path = subprocess.getoutput("which uv") or "uv"
×
303

304
    print("--- Path Detection ---")
×
305
    print(f"Okysa Root:    {okysa_root}")
×
306
    if gamepack_root.exists():
×
307
        print(f"GamePack Root: {gamepack_root}")
×
308
    else:
309
        print(f"WARNING: GamePack not found at {gamepack_root}")
×
310
        gamepack_root = Path(
×
311
            get_input("Enter absolute path to GamePack", str(gamepack_root))
312
        )
313

314
    # 2. Gather Configuration
315
    print("\n--- Configuration ---")
×
316
    user = get_input("System user to run the bot", get_current_user())
×
317

318
    env_vars = load_env(env_path)
×
319

320
    # Required Env Vars
321
    env_vars["NOSSI"] = get_input(
×
322
        "NOSSI (domain for web services)", env_vars.get("NOSSI", "nossinet.cc")
323
    )
324
    env_vars["OLLAMA"] = get_input(
×
325
        "OLLAMA (URL for AI API)",
326
        env_vars.get("OLLAMA", "http://localhost:11434"),
327
    )
328
    env_vars["WIKI"] = get_input(
×
329
        "WIKI (path to wiki files)",
330
        env_vars.get("WIKI", str(okysa_root.parent / "wiki")),
331
    )
332
    env_vars["STORAGE"] = get_input(
×
333
        "STORAGE (path to storage JSON)",
334
        env_vars.get("STORAGE", str(okysa_root / "Golconda_storage.json")),
335
    )
336

337
    # Database detection and prompts
338
    # Look for known historical locations
339
    possible_dbs = [
×
340
        okysa_root / "okysa.db",
341
        Path.home() / "NN.db",
342
        okysa_root / "Golconda" / "remind.db",
343
    ]
NEW
344
    db_files = [str(f) for f in possible_dbs if f.exists()]
×
345

346
    # Also scan for any other .db files in project
347
    scan_dbs = list(okysa_root.glob("*.db")) + list(
×
348
        (okysa_root / "Golconda").glob("*.db")
349
    )
350
    for f in scan_dbs:
×
NEW
351
        gf = str(f)
×
352
        if gf not in db_files:
×
353
            db_files.append(gf)
×
354

355
    if db_files:
×
356
        print(f"\nDetected existing database files: {', '.join(db_files)}")
×
357
        if "~/NN.db" in db_files:
×
358
            print(
×
359
                "Note: ~/NN.db was detected and contains historical configs/chatlogs."
360
            )
361

362
    env_vars["DATABASE"] = get_input(
×
363
        "DATABASE (path to main okysa.db)",
364
        env_vars.get(
365
            "DATABASE",
366
            (
367
                str(Path.home() / "NN.db")
368
                if (Path.home() / "NN.db").exists()
369
                else str(okysa_root / "okysa.db")
370
            ),
371
        ),
372
    )
373
    env_vars["REMIND_DATABASE"] = get_input(
×
374
        "REMIND_DATABASE (path to remind.db)",
375
        env_vars.get("REMIND_DATABASE", str(okysa_root / "Golconda" / "remind.db")),
376
    )
377

378
    # Storage detection
379
    possible_storages = [
×
380
        okysa_root / "Golconda_storage.json",
381
    ]
NEW
382
    storage_files = [str(f) for f in possible_storages if f.exists()]
×
383
    if storage_files:
×
384
        print(f"\nDetected storage files: {', '.join(storage_files)}")
×
385

386
    env_vars["STORAGE"] = get_input(
×
387
        "STORAGE (path to storage JSON)",
388
        env_vars.get(
389
            "STORAGE",
390
            str(okysa_root / "Golconda_storage.json"),
391
        ),
392
    )
393

394
    if "DISCORD_TOKEN" not in env_vars:
×
395
        token_path = Path.home() / "token.discord"
×
396
        default_token = ""
×
397
        if token_path.exists():
×
398
            default_token = token_path.read_text().strip()
×
399
        env_vars["DISCORD_TOKEN"] = get_input(
×
400
            "DISCORD_TOKEN", env_vars.get("DISCORD_TOKEN", default_token)
401
        )
402

403
    domain_map = detect_domains()
×
404
    domain = ""
×
405
    existing_config = None
×
406
    if domain_map:
×
407
        print("\nDetected domains from Nginx:")
×
408
        sorted_domains = sorted(list(domain_map.keys()))
×
409
        for i, d in enumerate(sorted_domains):
×
410
            print(f"  {i + 1}. {d} (in {domain_map[d].name})")
×
411
        choice = get_input(f"Choose domain (1-{len(sorted_domains)}) or enter new", "1")
×
412
        if choice.isdigit() and 1 <= int(choice) <= len(sorted_domains):
×
413
            domain = sorted_domains[int(choice) - 1]
×
414
            existing_config = domain_map[domain]
×
415
        else:
416
            domain = choice
×
417
    else:
418
        domain = get_input("Domain for Nginx", env_vars.get("NOSSI", "nossinet.cc"))
×
419

420
    # 3. Prepare scripts
421
    deploy_sh_path = okysa_root / "scripts" / "deploy.sh"
×
422
    webhook_py_path = okysa_root / "scripts" / "webhook_listener.py"
×
423

424
    # Ensure scripts are executable
425
    os.chmod(deploy_sh_path, 0o755)
×
426
    os.chmod(webhook_py_path, 0o755)
×
427

428
    # 4. Generate Systemd Units
UNCOV
429
    bot_service = f"""[Unit]
×
430
Description=Okysa Discord Bot
431
After=network.target
432

433
[Service]
434
Type=simple
435
User={user}
436
WorkingDirectory={okysa_root}
437
EnvironmentFile={env_path}
438
ExecStart={uv_path} run Okysa.py
439
Restart=always
440

441
[Install]
442
WantedBy=multi-user.target
443
"""
444

445
    webhook_service = f"""[Unit]
×
446
Description=Okysa Webhook Listener
447
After=network.target
448

449
[Service]
450
Type=simple
451
User={user}
452
WorkingDirectory={okysa_root}
453
EnvironmentFile={env_path}
454
ExecStart=/usr/bin/python3 {okysa_root}/scripts/webhook_listener.py
455
Restart=always
456

457
[Install]
458
WantedBy=multi-user.target
459
"""
460

461
    # 5. Generate Nginx Config
462
    nginx_block = """
×
463
    location /webhook {
464
        proxy_pass http://localhost:5000;
465
        proxy_set_header Host $host;
466
        proxy_set_header X-Real-IP $remote_addr;
467
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
468
        proxy_set_header X-Forwarded-Proto $scheme;
469
    }
470
"""
471
    nginx_conf = f"""# Managed by Okysa Installer
×
472
server {{
473
    listen 80;
474
    server_name {domain};
475
{nginx_block}
476
}}
477
"""
478

479
    print("\n--- Proposed Actions ---")
×
480
    print(f"1. Save configuration to {env_path}")
×
481
    print("2. Write /etc/systemd/system/okysa.service")
×
482
    print("3. Write /etc/systemd/system/okysa-webhook.service")
×
483
    if existing_config:
×
484
        print(f"4. Inject /webhook into existing config: {existing_config.name}")
×
485
    else:
486
        print(f"4. Write new Nginx config: /etc/nginx/sites-available/{domain}")
×
487
    print("5. Add sudoers entry for systemctl restart")
×
488

489
    if get_input("Perform these actions? (requires sudo)", "n").lower() == "y":
×
490
        cleanup_legacy_backups()
×
491
        save_env(env_path, env_vars)
×
492
        print(f"Configuration saved to {env_path}")
×
493

494
        def sudo_write(content, path):
×
495
            subprocess.run(
×
496
                ["sudo", "tee", str(path)],
497
                input=content.encode(),
498
                stdout=subprocess.DEVNULL,
499
            )
500

501
        sudo_write(bot_service, "/etc/systemd/system/okysa.service")
×
502
        sudo_write(webhook_service, "/etc/systemd/system/okysa-webhook.service")
×
503

504
        if existing_config:
×
505
            inject_nginx_config(existing_config, domain, nginx_block)
×
506
        else:
507
            target_path = Path(f"/etc/nginx/sites-available/{domain}")
×
508
            if target_path.exists():
×
509
                backup_dir = target_path.parent.parent / "backups"
×
510
                if not backup_dir.exists():
×
511
                    subprocess.run(["sudo", "mkdir", "-p", str(backup_dir)], check=True)
×
512

513
                new_backup = (
×
514
                    backup_dir / f"{target_path.name}.bak.{secrets.token_hex(4)}"
515
                )
516
                print(
×
517
                    f"Existing config found at {target_path}. Backing up to {new_backup}"
518
                )
519
                subprocess.run(
×
520
                    ["sudo", "cp", str(target_path), str(new_backup)], check=True
521
                )
522

523
            sudo_write(nginx_conf, str(target_path))
×
524
            if (
×
525
                get_input(f"Link /etc/nginx/sites-enabled/{domain}? (y/n)", "y").lower()
526
                == "y"
527
            ):
528
                subprocess.run(
×
529
                    [
530
                        "sudo",
531
                        "ln",
532
                        "-sf",
533
                        f"/etc/nginx/sites-available/{domain}",
534
                        f"/etc/nginx/sites-enabled/{domain}",
535
                    ]
536
                )
537

538
        # Reload Nginx regardless of method (inject handles its own test)
539
        print("Reloading Nginx...")
×
540
        subprocess.run(["sudo", "systemctl", "reload", "nginx"])
×
541

542
        sudoers_line = (
×
543
            f"{user} ALL=(ALL) NOPASSWD: /usr/bin/systemctl restart okysa.service\n"
544
        )
545
        sudo_write(sudoers_line, "/etc/sudoers.d/okysa-deploy")
×
546

547
        subprocess.run(["sudo", "systemctl", "daemon-reload"])
×
548
        if get_input("Enable and start services now? (y/n)", "y").lower() == "y":
×
549
            subprocess.run(
×
550
                ["sudo", "systemctl", "enable", "--now", "okysa-webhook", "okysa"]
551
            )
552

553
        # Register webhook and only update secret if user said yes
554
        new_secret = register_webhook(domain, env_vars.get("GITHUB_WEBHOOK_SECRET"))
×
555
        if new_secret:
×
556
            env_vars["GITHUB_WEBHOOK_SECRET"] = new_secret
×
557
            save_env(env_path, env_vars)
×
558
            print(f"Updated {env_path} with the registered webhook secret.")
×
559
            print(f"Secret: {new_secret}")
×
560
        else:
561
            print("Webhook registration skipped or failed. Secret not updated in .env.")
×
562
            current_secret = env_vars.get("GITHUB_WEBHOOK_SECRET")
×
563
            if current_secret:
×
564
                print(f"Current secret preserved: {current_secret}")
×
565

566
        print("\nInstallation complete.")
×
567
    else:
568
        print("\nInstallation aborted. No changes made.")
×
569

570

571
if __name__ == "__main__":
1✔
572
    main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc