• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

x4dr / Okysa / 21277007407

23 Jan 2026 06:31AM UTC coverage: 78.966% (-0.4%) from 79.352%
21277007407

push

github

x4dr
installscript backups

1 of 22 new or added lines in 1 file covered. (4.55%)

1 existing line in 1 file now uncovered.

2718 of 3442 relevant lines covered (78.97%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.6
/scripts/install.py
1
#!/usr/bin/env python3
2
import argparse
1✔
3
import getpass
1✔
4
import json
1✔
5
import os
1✔
6
import re
1✔
7
import secrets
1✔
8
import subprocess
1✔
9
from pathlib import Path
1✔
10

11

12
def get_input(prompt, default=None):
1✔
13
    if default:
×
14
        res = input(f"{prompt} [{default}]: ").strip()
×
15
        return res if res else default
×
16
    return input(f"{prompt}: ").strip()
×
17

18

19
def get_current_user():
1✔
20
    try:
×
21
        return os.getlogin()
×
22
    except OSError:
×
23
        return getpass.getuser()
×
24

25

26
def detect_domains():
1✔
27
    domain_map = {}
×
28
    sites_enabled = Path("/etc/nginx/sites-enabled")
×
29
    if not sites_enabled.exists():
×
30
        return domain_map
×
31

32
    for site in sites_enabled.iterdir():
×
NEW
33
        if ".bak." in site.name:
×
NEW
34
            continue
×
35
        try:
×
36
            content = site.read_text()
×
37
            # Match lines that start with server_name (ignoring comments)
NEW
38
            matches = re.findall(r"^\s*server_name\s+([^;]+);", content, re.MULTILINE)
×
39
            for match in matches:
×
40
                for domain in match.split():
×
41
                    if domain and not domain.startswith("_"):
×
42
                        domain_map[domain] = site
×
43
        except Exception:
×
44
            continue
×
45
    return domain_map
×
46

47

48
def cleanup_legacy_backups():
1✔
NEW
49
    print("Checking for legacy backup files in Nginx directories...")
×
NEW
50
    paths = [Path("/etc/nginx/sites-enabled"), Path("/etc/nginx/sites-available")]
×
NEW
51
    for p in paths:
×
NEW
52
        if p.exists():
×
NEW
53
            for f in p.iterdir():
×
NEW
54
                if ".bak." in f.name:
×
NEW
55
                    print(f"Removing legacy backup: {f}")
×
NEW
56
                    subprocess.run(["sudo", "rm", str(f)], check=True)
×
57

58

59
def inject_nginx_config(filepath, domain, block):
1✔
60
    content = filepath.read_text()
×
61
    if block.strip() in content:
×
62
        print(f"Webhook block already exists in {filepath.name}. Skipping injection.")
×
63
        return True
×
64

65
    # Backup
NEW
66
    backup_dir = filepath.parent.parent / "backups"
×
NEW
67
    if not backup_dir.exists():
×
NEW
68
        subprocess.run(["sudo", "mkdir", "-p", str(backup_dir)], check=True)
×
69

NEW
70
    backup = backup_dir / f"{filepath.name}.bak.{secrets.token_hex(4)}"
×
71
    subprocess.run(["sudo", "cp", str(filepath), str(backup)], check=True)
×
NEW
72
    print(f"Created Nginx backup: {backup}")
×
73

74
    # Injection logic
75
    # Find the server block for the domain
76
    # We look for server_name domain; and then backtrack to find the closest server { before it
77
    # But a simpler way is to find the server { that contains server_name domain;
78
    # and then find the last } of that block.
79

80
    lines = content.splitlines()
×
81
    in_server_block = False
×
82
    brace_count = 0
×
83

84
    # This is a basic parser that looks for the server block containing the domain
85
    for i, line in enumerate(lines):
×
86
        if "server {" in line:
×
87
            in_server_block = True
×
88
            brace_count = 1
×
89
            continue
×
90

91
        if in_server_block:
×
92
            brace_count += line.count("{")
×
93
            brace_count -= line.count("}")
×
94

95
            if domain in line and "server_name" in line:
×
96
                # Found the right block. Now find where it ends.
97
                # Continue until brace_count is 0
98
                for j in range(i + 1, len(lines)):
×
99
                    brace_count += lines[j].count("{")
×
100
                    brace_count -= lines[j].count("}")
×
101
                    if brace_count == 0:
×
102
                        # Insert before this line
103
                        new_lines = lines[:j] + [block] + lines[j:]
×
104
                        new_content = "\n".join(new_lines)
×
105

106
                        # Write and test
107
                        try:
×
108
                            subprocess.run(
×
109
                                ["sudo", "tee", str(filepath)],
110
                                input=new_content.encode(),
111
                                stdout=subprocess.DEVNULL,
112
                                check=True,
113
                            )
114
                            print("Injected webhook block into existing Nginx config.")
×
115
                            if subprocess.run(["sudo", "nginx", "-t"]).returncode == 0:
×
116
                                return True
×
117
                            else:
118
                                print("Nginx test failed! Rolling back...")
×
119
                                subprocess.run(
×
120
                                    ["sudo", "cp", str(backup), str(filepath)],
121
                                    check=True,
122
                                )
123
                                return False
×
124
                        except Exception as e:
×
125
                            print(f"Error during injection: {e}")
×
126
                            subprocess.run(
×
127
                                ["sudo", "cp", str(backup), str(filepath)], check=True
128
                            )
129
                            return False
×
130

131
            if brace_count == 0:
×
132
                in_server_block = False
×
133

134
    print(f"Could not find a suitable server block for {domain} in {filepath.name}.")
×
135
    return False
×
136

137

138
def get_repo_nwo():
1✔
139
    try:
×
140
        res = subprocess.run(
×
141
            ["gh", "repo", "view", "--json", "nameWithOwner", "-q", ".nameWithOwner"],
142
            capture_output=True,
143
            text=True,
144
            check=True,
145
        )
146
        return res.stdout.strip()
×
147
    except subprocess.CalledProcessError:
×
148
        return None
×
149

150

151
def register_webhook(domain, existing_secret):
1✔
152
    print("\n--- GitHub Webhook Registration ---")
×
153
    use_gh = get_input("Use 'gh' CLI to register/update webhook?", "y")
×
154
    if use_gh.lower() != "y":
×
155
        return None
×
156

157
    if subprocess.run(["which", "gh"], capture_output=True).returncode != 0:
×
158
        print("Error: 'gh' CLI not found.")
×
159
        return None
×
160

161
    # Check GH auth status
162
    auth_check = subprocess.run(["gh", "auth", "status"], capture_output=True)
×
163
    if auth_check.returncode != 0:
×
164
        print("GitHub CLI is not authenticated.")
×
165
        if get_input("Run 'gh auth login' now?", "y").lower() == "y":
×
166
            try:
×
167
                subprocess.run(["gh", "auth", "login"], check=True)
×
168
            except subprocess.CalledProcessError:
×
169
                return None
×
170
        else:
171
            return None
×
172

173
    nwo = get_repo_nwo()
×
174
    if not nwo:
×
175
        print("Could not determine repository name.")
×
176
        return None
×
177

178
    webhook_url = f"https://{domain}/webhook"
×
179

180
    # Generate new secret if needed
181
    secret = existing_secret or secrets.token_urlsafe(32)
×
182

183
    # Check for existing webhook
184
    try:
×
185
        hooks_json = subprocess.run(
×
186
            ["gh", "api", f"repos/{nwo}/hooks"],
187
            capture_output=True,
188
            text=True,
189
            check=True,
190
        ).stdout
191
        hooks = json.loads(hooks_json)
×
192
        existing_hook = next(
×
193
            (h for h in hooks if h.get("config", {}).get("url") == webhook_url), None
194
        )
195

196
        hook_data = {
×
197
            "active": True,
198
            "events": ["workflow_run"],
199
            "config": {
200
                "url": webhook_url,
201
                "content_type": "json",
202
                "secret": secret,
203
            },
204
        }
205

206
        if existing_hook:
×
207
            print(f"Found existing webhook (ID: {existing_hook['id']}). Updating...")
×
208
            endpoint = f"repos/{nwo}/hooks/{existing_hook['id']}"
×
209
            method = "PATCH"
×
210
        else:
211
            print("Creating new webhook...")
×
212
            hook_data["name"] = "web"
×
213
            endpoint = f"repos/{nwo}/hooks"
×
214
            method = "POST"
×
215

216
        subprocess.run(
×
217
            ["gh", "api", endpoint, "--method", method, "--input", "-"],
218
            input=json.dumps(hook_data),
219
            text=True,
220
            check=True,
221
        )
222
        print(f"Successfully registered/updated webhook: {webhook_url}")
×
223
        return secret
×
224
    except Exception as e:
×
225
        print(f"Failed to manage webhook: {e}")
×
226
        return None
×
227

228

229
def uninstall(okysa_root):
1✔
230
    print("\n--- Uninstalling Okysa Deployment ---")
×
231

232
    # Stop and disable services
233
    services = ["okysa.service", "okysa-webhook.service"]
×
234
    for svc in services:
×
235
        print(f"Stopping and disabling {svc}...")
×
236
        subprocess.run(["sudo", "systemctl", "stop", svc], stderr=subprocess.DEVNULL)
×
237
        subprocess.run(["sudo", "systemctl", "disable", svc], stderr=subprocess.DEVNULL)
×
238
        subprocess.run(
×
239
            ["sudo", "rm", f"/etc/systemd/system/{svc}"], stderr=subprocess.DEVNULL
240
        )
241

242
    # Nginx
243
    sites_available = Path("/etc/nginx/sites-available")
×
244
    if sites_available.exists():
×
245
        for site in sites_available.iterdir():
×
246
            try:
×
247
                content = site.read_text()
×
248
                if "# Managed by Okysa Installer" in content:
×
249
                    print(f"Removing Nginx config: {site.name}")
×
250
                    subprocess.run(
×
251
                        ["sudo", "rm", f"/etc/nginx/sites-enabled/{site.name}"],
252
                        stderr=subprocess.DEVNULL,
253
                    )
254
                    subprocess.run(["sudo", "rm", str(site)], stderr=subprocess.DEVNULL)
×
255
            except:
×
256
                continue
×
257

258
    # Sudoers
259
    print("Removing sudoers entry...")
×
260
    subprocess.run(
×
261
        ["sudo", "rm", "/etc/sudoers.d/okysa-deploy"], stderr=subprocess.DEVNULL
262
    )
263

264
    subprocess.run(["sudo", "systemctl", "daemon-reload"])
×
265
    print("\nUninstallation complete.")
×
266

267

268
def load_env(env_path):
1✔
269
    env_vars = {}
×
270
    if env_path.exists():
×
271
        with open(env_path, "r") as f:
×
272
            for line in f:
×
273
                line = line.strip()
×
274
                if "=" in line and not line.startswith("#"):
×
275
                    k, v = line.split("=", 1)
×
276
                    env_vars[k.strip()] = v.strip().strip('"').strip("'")
×
277
    return env_vars
×
278

279

280
def save_env(env_path, env_vars):
1✔
281
    with open(env_path, "w") as f:
×
282
        for k, v in sorted(env_vars.items()):
×
283
            f.write(f'{k}="{v}"\n')
×
284

285

286
def main():
1✔
287
    parser = argparse.ArgumentParser(description="Okysa Deployment Installer")
×
288
    parser.add_argument(
×
289
        "--uninstall", action="store_true", help="Uninstall the deployment"
290
    )
291
    args = parser.parse_args()
×
292

293
    script_path = Path(__file__).resolve()
×
294
    okysa_root = script_path.parent.parent
×
295
    env_path = okysa_root / ".env"
×
296
    home_dir = str(Path.home())
×
297

298
    if args.uninstall:
×
299
        uninstall(okysa_root)
×
300
        return
×
301

302
    gamepack_root = okysa_root.parent / "GamePack"
×
303
    uv_path = subprocess.getoutput("which uv") or "uv"
×
304

305
    print("--- Path Detection ---")
×
306
    print(f"Okysa Root:    {okysa_root}")
×
307
    if gamepack_root.exists():
×
308
        print(f"GamePack Root: {gamepack_root}")
×
309
    else:
310
        print(f"WARNING: GamePack not found at {gamepack_root}")
×
311
        gamepack_root = Path(
×
312
            get_input("Enter absolute path to GamePack", str(gamepack_root))
313
        )
314

315
    # 2. Gather Configuration
316
    print("\n--- Configuration ---")
×
317
    user = get_input("System user to run the bot", get_current_user())
×
318

319
    env_vars = load_env(env_path)
×
320

321
    def to_generic(p):
×
322
        return str(p).replace(home_dir, "~")
×
323

324
    # Required Env Vars
325
    env_vars["NOSSI"] = get_input(
×
326
        "NOSSI (domain for web services)", env_vars.get("NOSSI", "nossinet.cc")
327
    )
328
    env_vars["OLLAMA"] = get_input(
×
329
        "OLLAMA (URL for AI API)",
330
        env_vars.get("OLLAMA", "http://localhost:11434"),
331
    )
332
    env_vars["WIKI"] = get_input(
×
333
        "WIKI (path to wiki files)",
334
        env_vars.get("WIKI", to_generic(okysa_root.parent / "wiki")),
335
    )
336
    env_vars["STORAGE"] = get_input(
×
337
        "STORAGE (path to storage JSON)",
338
        env_vars.get("STORAGE", to_generic(okysa_root / "Golconda_storage.json")),
339
    )
340

341
    # Database detection and prompts
342
    # Look for known historical locations
343
    possible_dbs = [
×
344
        okysa_root / "okysa.db",
345
        Path.home() / "NN.db",
346
        okysa_root / "Golconda" / "remind.db",
347
    ]
348
    db_files = [to_generic(f) for f in possible_dbs if f.exists()]
×
349

350
    # Also scan for any other .db files in project
351
    scan_dbs = list(okysa_root.glob("*.db")) + list(
×
352
        (okysa_root / "Golconda").glob("*.db")
353
    )
354
    for f in scan_dbs:
×
355
        gf = to_generic(f)
×
356
        if gf not in db_files:
×
357
            db_files.append(gf)
×
358

359
    if db_files:
×
360
        print(f"\nDetected existing database files: {', '.join(db_files)}")
×
361
        if "~/NN.db" in db_files:
×
362
            print(
×
363
                "Note: ~/NN.db was detected and contains historical configs/chatlogs."
364
            )
365

366
    env_vars["DATABASE"] = get_input(
×
367
        "DATABASE (path to main okysa.db)",
368
        env_vars.get(
369
            "DATABASE",
370
            (
371
                to_generic(Path.home() / "NN.db")
372
                if (Path.home() / "NN.db").exists()
373
                else to_generic(okysa_root / "okysa.db")
374
            ),
375
        ),
376
    )
377
    env_vars["REMIND_DATABASE"] = get_input(
×
378
        "REMIND_DATABASE (path to remind.db)",
379
        env_vars.get(
380
            "REMIND_DATABASE", to_generic(okysa_root / "Golconda" / "remind.db")
381
        ),
382
    )
383

384
    # Storage detection
385
    possible_storages = [
×
386
        okysa_root / "Golconda_storage.json",
387
    ]
388
    storage_files = [to_generic(f) for f in possible_storages if f.exists()]
×
389
    if storage_files:
×
390
        print(f"\nDetected storage files: {', '.join(storage_files)}")
×
391

392
    env_vars["STORAGE"] = get_input(
×
393
        "STORAGE (path to storage JSON)",
394
        env_vars.get(
395
            "STORAGE",
396
            to_generic(okysa_root / "Golconda_storage.json"),
397
        ),
398
    )
399

400
    if "DISCORD_TOKEN" not in env_vars:
×
401
        token_path = Path.home() / "token.discord"
×
402
        default_token = ""
×
403
        if token_path.exists():
×
404
            default_token = token_path.read_text().strip()
×
405
        env_vars["DISCORD_TOKEN"] = get_input(
×
406
            "DISCORD_TOKEN", env_vars.get("DISCORD_TOKEN", default_token)
407
        )
408

409
    domain_map = detect_domains()
×
410
    domain = ""
×
411
    existing_config = None
×
412
    if domain_map:
×
413
        print("\nDetected domains from Nginx:")
×
414
        sorted_domains = sorted(list(domain_map.keys()))
×
415
        for i, d in enumerate(sorted_domains):
×
416
            print(f"  {i + 1}. {d} (in {domain_map[d].name})")
×
417
        choice = get_input(f"Choose domain (1-{len(sorted_domains)}) or enter new", "1")
×
418
        if choice.isdigit() and 1 <= int(choice) <= len(sorted_domains):
×
419
            domain = sorted_domains[int(choice) - 1]
×
420
            existing_config = domain_map[domain]
×
421
        else:
422
            domain = choice
×
423
    else:
424
        domain = get_input("Domain for Nginx", env_vars.get("NOSSI", "nossinet.cc"))
×
425

426
    # 3. Prepare scripts
427
    deploy_sh_path = okysa_root / "scripts" / "deploy.sh"
×
428
    webhook_py_path = okysa_root / "scripts" / "webhook_listener.py"
×
429

430
    # Ensure scripts are executable
431
    os.chmod(deploy_sh_path, 0o755)
×
432
    os.chmod(webhook_py_path, 0o755)
×
433

434
    # 4. Generate Systemd Units
435
    # Use %h for home directory to avoid hardcoding the username in the unit files
436
    home_dir = str(Path.home())
×
437
    okysa_root_generic = str(okysa_root).replace(home_dir, "%h")
×
438
    env_path_generic = str(env_path).replace(home_dir, "%h")
×
439
    uv_path_generic = str(uv_path).replace(home_dir, "%h")
×
440

441
    bot_service = f"""[Unit]
×
442
Description=Okysa Discord Bot
443
After=network.target
444

445
[Service]
446
Type=simple
447
User={user}
448
WorkingDirectory={okysa_root_generic}
449
EnvironmentFile={env_path_generic}
450
ExecStart={uv_path_generic} run Okysa.py
451
Restart=always
452

453
[Install]
454
WantedBy=multi-user.target
455
"""
456

457
    webhook_service = f"""[Unit]
×
458
Description=Okysa Webhook Listener
459
After=network.target
460

461
[Service]
462
Type=simple
463
User={user}
464
WorkingDirectory={okysa_root_generic}
465
EnvironmentFile={env_path_generic}
466
ExecStart=/usr/bin/python3 {okysa_root_generic}/scripts/webhook_listener.py
467
Restart=always
468

469
[Install]
470
WantedBy=multi-user.target
471
"""
472

473
    # 5. Generate Nginx Config
474
    nginx_block = """
×
475
    location /webhook {
476
        proxy_pass http://localhost:5000;
477
        proxy_set_header Host $host;
478
        proxy_set_header X-Real-IP $remote_addr;
479
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
480
        proxy_set_header X-Forwarded-Proto $scheme;
481
    }
482
"""
483
    nginx_conf = f"""# Managed by Okysa Installer
×
484
server {{
485
    listen 80;
486
    server_name {domain};
487
{nginx_block}
488
}}
489
"""
490

491
    print("\n--- Proposed Actions ---")
×
492
    print(f"1. Save configuration to {env_path}")
×
493
    print("2. Write /etc/systemd/system/okysa.service")
×
494
    print("3. Write /etc/systemd/system/okysa-webhook.service")
×
495
    if existing_config:
×
496
        print(f"4. Inject /webhook into existing config: {existing_config.name}")
×
497
    else:
498
        print(f"4. Write new Nginx config: /etc/nginx/sites-available/{domain}")
×
499
    print("5. Add sudoers entry for systemctl restart")
×
500

501
    if get_input("Perform these actions? (requires sudo)", "n").lower() == "y":
×
NEW
502
        cleanup_legacy_backups()
×
503
        save_env(env_path, env_vars)
×
504
        print(f"Configuration saved to {env_path}")
×
505

506
        def sudo_write(content, path):
×
507
            subprocess.run(
×
508
                ["sudo", "tee", str(path)],
509
                input=content.encode(),
510
                stdout=subprocess.DEVNULL,
511
            )
512

513
        sudo_write(bot_service, "/etc/systemd/system/okysa.service")
×
514
        sudo_write(webhook_service, "/etc/systemd/system/okysa-webhook.service")
×
515

516
        if existing_config:
×
517
            inject_nginx_config(existing_config, domain, nginx_block)
×
518
        else:
519
            target_path = Path(f"/etc/nginx/sites-available/{domain}")
×
520
            if target_path.exists():
×
NEW
521
                backup_dir = target_path.parent.parent / "backups"
×
NEW
522
                if not backup_dir.exists():
×
NEW
523
                    subprocess.run(["sudo", "mkdir", "-p", str(backup_dir)], check=True)
×
524

NEW
525
                new_backup = (
×
526
                    backup_dir / f"{target_path.name}.bak.{secrets.token_hex(4)}"
527
                )
UNCOV
528
                print(
×
529
                    f"Existing config found at {target_path}. Backing up to {new_backup}"
530
                )
531
                subprocess.run(
×
532
                    ["sudo", "cp", str(target_path), str(new_backup)], check=True
533
                )
534

535
            sudo_write(nginx_conf, str(target_path))
×
536
            if (
×
537
                get_input(f"Link /etc/nginx/sites-enabled/{domain}? (y/n)", "y").lower()
538
                == "y"
539
            ):
540
                subprocess.run(
×
541
                    [
542
                        "sudo",
543
                        "ln",
544
                        "-sf",
545
                        f"/etc/nginx/sites-available/{domain}",
546
                        f"/etc/nginx/sites-enabled/{domain}",
547
                    ]
548
                )
549

550
        # Reload Nginx regardless of method (inject handles its own test)
551
        print("Reloading Nginx...")
×
552
        subprocess.run(["sudo", "systemctl", "reload", "nginx"])
×
553

554
        sudoers_line = (
×
555
            f"{user} ALL=(ALL) NOPASSWD: /usr/bin/systemctl restart okysa.service\n"
556
        )
557
        sudo_write(sudoers_line, "/etc/sudoers.d/okysa-deploy")
×
558

559
        subprocess.run(["sudo", "systemctl", "daemon-reload"])
×
560
        if get_input("Enable and start services now? (y/n)", "y").lower() == "y":
×
561
            subprocess.run(
×
562
                ["sudo", "systemctl", "enable", "--now", "okysa-webhook", "okysa"]
563
            )
564

565
        # Register webhook and only update secret if user said yes
566
        new_secret = register_webhook(domain, env_vars.get("GITHUB_WEBHOOK_SECRET"))
×
567
        if new_secret:
×
568
            env_vars["GITHUB_WEBHOOK_SECRET"] = new_secret
×
569
            save_env(env_path, env_vars)
×
570
            print(f"Updated {env_path} with the registered webhook secret.")
×
571
            print(f"Secret: {new_secret}")
×
572
        else:
573
            print("Webhook registration skipped or failed. Secret not updated in .env.")
×
574
            current_secret = env_vars.get("GITHUB_WEBHOOK_SECRET")
×
575
            if current_secret:
×
576
                print(f"Current secret preserved: {current_secret}")
×
577

578
        print("\nInstallation complete.")
×
579
    else:
580
        print("\nInstallation aborted. No changes made.")
×
581

582

583
if __name__ == "__main__":
1✔
584
    main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc