• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

x4dr / Okysa / 21276568193

23 Jan 2026 06:10AM UTC coverage: 79.352% (-1.2%) from 80.57%
21276568193

push

github

x4dr
installing into nginx

1 of 74 new or added lines in 1 file covered. (1.35%)

2 existing lines in 1 file now uncovered.

2717 of 3424 relevant lines covered (79.35%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.67
/scripts/install.py
1
#!/usr/bin/env python3
2
import argparse
1✔
3
import getpass
1✔
4
import json
1✔
5
import os
1✔
6
import re
1✔
7
import secrets
1✔
8
import subprocess
1✔
9
from pathlib import Path
1✔
10

11

12
def get_input(prompt, default=None):
1✔
13
    if default:
×
14
        res = input(f"{prompt} [{default}]: ").strip()
×
15
        return res if res else default
×
16
    return input(f"{prompt}: ").strip()
×
17

18

19
def get_current_user():
1✔
20
    try:
×
21
        return os.getlogin()
×
22
    except OSError:
×
23
        return getpass.getuser()
×
24

25

26
def detect_domains():
1✔
NEW
27
    domain_map = {}
×
28
    sites_enabled = Path("/etc/nginx/sites-enabled")
×
29
    if not sites_enabled.exists():
×
NEW
30
        return domain_map
×
31

32
    for site in sites_enabled.iterdir():
×
33
        try:
×
34
            content = site.read_text()
×
35
            matches = re.findall(r"server_name\s+([^;]+);", content)
×
36
            for match in matches:
×
37
                for domain in match.split():
×
38
                    if domain and not domain.startswith("_"):
×
NEW
39
                        domain_map[domain] = site
×
40
        except Exception:
×
41
            continue
×
NEW
42
    return domain_map
×
43

44

45
def inject_nginx_config(filepath, domain, block):
1✔
NEW
46
    content = filepath.read_text()
×
NEW
47
    if block.strip() in content:
×
NEW
48
        print(f"Webhook block already exists in {filepath.name}. Skipping injection.")
×
NEW
49
        return True
×
50

51
    # Backup
NEW
52
    backup = filepath.with_suffix(f".bak.{secrets.token_hex(4)}")
×
NEW
53
    subprocess.run(["sudo", "cp", str(filepath), str(backup)], check=True)
×
NEW
54
    print(f"Created backup: {backup}")
×
55

56
    # Injection logic
57
    # Find the server block for the domain
58
    # We look for server_name domain; and then backtrack to find the closest server { before it
59
    # But a simpler way is to find the server { that contains server_name domain;
60
    # and then find the last } of that block.
61

NEW
62
    lines = content.splitlines()
×
NEW
63
    in_server_block = False
×
NEW
64
    brace_count = 0
×
65

66
    # This is a basic parser that looks for the server block containing the domain
NEW
67
    for i, line in enumerate(lines):
×
NEW
68
        if "server {" in line:
×
NEW
69
            in_server_block = True
×
NEW
70
            brace_count = 1
×
NEW
71
            continue
×
72

NEW
73
        if in_server_block:
×
NEW
74
            brace_count += line.count("{")
×
NEW
75
            brace_count -= line.count("}")
×
76

NEW
77
            if domain in line and "server_name" in line:
×
78
                # Found the right block. Now find where it ends.
79
                # Continue until brace_count is 0
NEW
80
                for j in range(i + 1, len(lines)):
×
NEW
81
                    brace_count += lines[j].count("{")
×
NEW
82
                    brace_count -= lines[j].count("}")
×
NEW
83
                    if brace_count == 0:
×
84
                        # Insert before this line
NEW
85
                        new_lines = lines[:j] + [block] + lines[j:]
×
NEW
86
                        new_content = "\n".join(new_lines)
×
87

88
                        # Write and test
NEW
89
                        try:
×
NEW
90
                            subprocess.run(
×
91
                                ["sudo", "tee", str(filepath)],
92
                                input=new_content.encode(),
93
                                stdout=subprocess.DEVNULL,
94
                                check=True,
95
                            )
NEW
96
                            print("Injected webhook block into existing Nginx config.")
×
NEW
97
                            if subprocess.run(["sudo", "nginx", "-t"]).returncode == 0:
×
NEW
98
                                return True
×
99
                            else:
NEW
100
                                print("Nginx test failed! Rolling back...")
×
NEW
101
                                subprocess.run(
×
102
                                    ["sudo", "cp", str(backup), str(filepath)],
103
                                    check=True,
104
                                )
NEW
105
                                return False
×
NEW
106
                        except Exception as e:
×
NEW
107
                            print(f"Error during injection: {e}")
×
NEW
108
                            subprocess.run(
×
109
                                ["sudo", "cp", str(backup), str(filepath)], check=True
110
                            )
NEW
111
                            return False
×
112

NEW
113
            if brace_count == 0:
×
NEW
114
                in_server_block = False
×
115

NEW
116
    print(f"Could not find a suitable server block for {domain} in {filepath.name}.")
×
NEW
117
    return False
×
118

119

120
def get_repo_nwo():
1✔
121
    try:
×
122
        res = subprocess.run(
×
123
            ["gh", "repo", "view", "--json", "nameWithOwner", "-q", ".nameWithOwner"],
124
            capture_output=True,
125
            text=True,
126
            check=True,
127
        )
128
        return res.stdout.strip()
×
129
    except subprocess.CalledProcessError:
×
130
        return None
×
131

132

133
def register_webhook(domain, existing_secret):
1✔
134
    print("\n--- GitHub Webhook Registration ---")
×
135
    use_gh = get_input("Use 'gh' CLI to register/update webhook?", "y")
×
136
    if use_gh.lower() != "y":
×
137
        return None
×
138

139
    if subprocess.run(["which", "gh"], capture_output=True).returncode != 0:
×
140
        print("Error: 'gh' CLI not found.")
×
141
        return None
×
142

143
    # Check GH auth status
144
    auth_check = subprocess.run(["gh", "auth", "status"], capture_output=True)
×
145
    if auth_check.returncode != 0:
×
146
        print("GitHub CLI is not authenticated.")
×
147
        if get_input("Run 'gh auth login' now?", "y").lower() == "y":
×
148
            try:
×
149
                subprocess.run(["gh", "auth", "login"], check=True)
×
150
            except subprocess.CalledProcessError:
×
151
                return None
×
152
        else:
153
            return None
×
154

155
    nwo = get_repo_nwo()
×
156
    if not nwo:
×
157
        print("Could not determine repository name.")
×
158
        return None
×
159

160
    webhook_url = f"https://{domain}/webhook"
×
161

162
    # Generate new secret if needed
163
    secret = existing_secret or secrets.token_urlsafe(32)
×
164

165
    # Check for existing webhook
166
    try:
×
167
        hooks_json = subprocess.run(
×
168
            ["gh", "api", f"repos/{nwo}/hooks"],
169
            capture_output=True,
170
            text=True,
171
            check=True,
172
        ).stdout
173
        hooks = json.loads(hooks_json)
×
174
        existing_hook = next(
×
175
            (h for h in hooks if h.get("config", {}).get("url") == webhook_url), None
176
        )
177

178
        hook_data = {
×
179
            "active": True,
180
            "events": ["workflow_run"],
181
            "config": {
182
                "url": webhook_url,
183
                "content_type": "json",
184
                "secret": secret,
185
            },
186
        }
187

188
        if existing_hook:
×
189
            print(f"Found existing webhook (ID: {existing_hook['id']}). Updating...")
×
190
            endpoint = f"repos/{nwo}/hooks/{existing_hook['id']}"
×
191
            method = "PATCH"
×
192
        else:
193
            print("Creating new webhook...")
×
194
            hook_data["name"] = "web"
×
195
            endpoint = f"repos/{nwo}/hooks"
×
196
            method = "POST"
×
197

198
        subprocess.run(
×
199
            ["gh", "api", endpoint, "--method", method, "--input", "-"],
200
            input=json.dumps(hook_data),
201
            text=True,
202
            check=True,
203
        )
204
        print(f"Successfully registered/updated webhook: {webhook_url}")
×
205
        return secret
×
206
    except Exception as e:
×
207
        print(f"Failed to manage webhook: {e}")
×
208
        return None
×
209

210

211
def uninstall(okysa_root):
1✔
212
    print("\n--- Uninstalling Okysa Deployment ---")
×
213

214
    # Stop and disable services
215
    services = ["okysa.service", "okysa-webhook.service"]
×
216
    for svc in services:
×
217
        print(f"Stopping and disabling {svc}...")
×
218
        subprocess.run(["sudo", "systemctl", "stop", svc], stderr=subprocess.DEVNULL)
×
219
        subprocess.run(["sudo", "systemctl", "disable", svc], stderr=subprocess.DEVNULL)
×
220
        subprocess.run(
×
221
            ["sudo", "rm", f"/etc/systemd/system/{svc}"], stderr=subprocess.DEVNULL
222
        )
223

224
    # Nginx
225
    sites_available = Path("/etc/nginx/sites-available")
×
226
    if sites_available.exists():
×
227
        for site in sites_available.iterdir():
×
228
            try:
×
229
                content = site.read_text()
×
230
                if "# Managed by Okysa Installer" in content:
×
231
                    print(f"Removing Nginx config: {site.name}")
×
232
                    subprocess.run(
×
233
                        ["sudo", "rm", f"/etc/nginx/sites-enabled/{site.name}"],
234
                        stderr=subprocess.DEVNULL,
235
                    )
236
                    subprocess.run(["sudo", "rm", str(site)], stderr=subprocess.DEVNULL)
×
237
            except:
×
238
                continue
×
239

240
    # Sudoers
241
    print("Removing sudoers entry...")
×
242
    subprocess.run(
×
243
        ["sudo", "rm", "/etc/sudoers.d/okysa-deploy"], stderr=subprocess.DEVNULL
244
    )
245

246
    subprocess.run(["sudo", "systemctl", "daemon-reload"])
×
247
    print("\nUninstallation complete.")
×
248

249

250
def load_env(env_path):
1✔
251
    env_vars = {}
×
252
    if env_path.exists():
×
253
        with open(env_path, "r") as f:
×
254
            for line in f:
×
255
                line = line.strip()
×
256
                if "=" in line and not line.startswith("#"):
×
257
                    k, v = line.split("=", 1)
×
258
                    env_vars[k.strip()] = v.strip().strip('"').strip("'")
×
259
    return env_vars
×
260

261

262
def save_env(env_path, env_vars):
1✔
263
    with open(env_path, "w") as f:
×
264
        for k, v in sorted(env_vars.items()):
×
265
            f.write(f'{k}="{v}"\n')
×
266

267

268
def main():
1✔
269
    parser = argparse.ArgumentParser(description="Okysa Deployment Installer")
×
270
    parser.add_argument(
×
271
        "--uninstall", action="store_true", help="Uninstall the deployment"
272
    )
273
    args = parser.parse_args()
×
274

275
    script_path = Path(__file__).resolve()
×
276
    okysa_root = script_path.parent.parent
×
277
    env_path = okysa_root / ".env"
×
278
    home_dir = str(Path.home())
×
279

280
    if args.uninstall:
×
281
        uninstall(okysa_root)
×
282
        return
×
283

284
    gamepack_root = okysa_root.parent / "GamePack"
×
285
    uv_path = subprocess.getoutput("which uv") or "uv"
×
286

287
    print("--- Path Detection ---")
×
288
    print(f"Okysa Root:    {okysa_root}")
×
289
    if gamepack_root.exists():
×
290
        print(f"GamePack Root: {gamepack_root}")
×
291
    else:
292
        print(f"WARNING: GamePack not found at {gamepack_root}")
×
293
        gamepack_root = Path(
×
294
            get_input("Enter absolute path to GamePack", str(gamepack_root))
295
        )
296

297
    # 2. Gather Configuration
298
    print("\n--- Configuration ---")
×
299
    user = get_input("System user to run the bot", get_current_user())
×
300

301
    env_vars = load_env(env_path)
×
302

303
    def to_generic(p):
×
304
        return str(p).replace(home_dir, "~")
×
305

306
    # Required Env Vars
307
    env_vars["NOSSI"] = get_input(
×
308
        "NOSSI (domain for web services)", env_vars.get("NOSSI", "nossinet.cc")
309
    )
310
    env_vars["OLLAMA"] = get_input(
×
311
        "OLLAMA (URL for AI API)",
312
        env_vars.get("OLLAMA", "http://localhost:11434"),
313
    )
314
    env_vars["WIKI"] = get_input(
×
315
        "WIKI (path to wiki files)",
316
        env_vars.get("WIKI", to_generic(okysa_root.parent / "wiki")),
317
    )
318
    env_vars["STORAGE"] = get_input(
×
319
        "STORAGE (path to storage JSON)",
320
        env_vars.get("STORAGE", to_generic(okysa_root / "Golconda_storage.json")),
321
    )
322

323
    # Database detection and prompts
324
    # Look for known historical locations
325
    possible_dbs = [
×
326
        okysa_root / "okysa.db",
327
        Path.home() / "NN.db",
328
        okysa_root / "Golconda" / "remind.db",
329
    ]
330
    db_files = [to_generic(f) for f in possible_dbs if f.exists()]
×
331

332
    # Also scan for any other .db files in project
333
    scan_dbs = list(okysa_root.glob("*.db")) + list(
×
334
        (okysa_root / "Golconda").glob("*.db")
335
    )
336
    for f in scan_dbs:
×
337
        gf = to_generic(f)
×
338
        if gf not in db_files:
×
339
            db_files.append(gf)
×
340

341
    if db_files:
×
342
        print(f"\nDetected existing database files: {', '.join(db_files)}")
×
343
        if "~/NN.db" in db_files:
×
344
            print(
×
345
                "Note: ~/NN.db was detected and contains historical configs/chatlogs."
346
            )
347

348
    env_vars["DATABASE"] = get_input(
×
349
        "DATABASE (path to main okysa.db)",
350
        env_vars.get(
351
            "DATABASE",
352
            (
353
                to_generic(Path.home() / "NN.db")
354
                if (Path.home() / "NN.db").exists()
355
                else to_generic(okysa_root / "okysa.db")
356
            ),
357
        ),
358
    )
359
    env_vars["REMIND_DATABASE"] = get_input(
×
360
        "REMIND_DATABASE (path to remind.db)",
361
        env_vars.get(
362
            "REMIND_DATABASE", to_generic(okysa_root / "Golconda" / "remind.db")
363
        ),
364
    )
365

366
    # Storage detection
367
    possible_storages = [
×
368
        okysa_root / "Golconda_storage.json",
369
    ]
370
    storage_files = [to_generic(f) for f in possible_storages if f.exists()]
×
371
    if storage_files:
×
372
        print(f"\nDetected storage files: {', '.join(storage_files)}")
×
373

374
    env_vars["STORAGE"] = get_input(
×
375
        "STORAGE (path to storage JSON)",
376
        env_vars.get(
377
            "STORAGE",
378
            to_generic(okysa_root / "Golconda_storage.json"),
379
        ),
380
    )
381

382
    if "DISCORD_TOKEN" not in env_vars:
×
383
        token_path = Path.home() / "token.discord"
×
384
        default_token = ""
×
385
        if token_path.exists():
×
386
            default_token = token_path.read_text().strip()
×
387
        env_vars["DISCORD_TOKEN"] = get_input(
×
388
            "DISCORD_TOKEN", env_vars.get("DISCORD_TOKEN", default_token)
389
        )
390

NEW
391
    domain_map = detect_domains()
×
NEW
392
    domain = ""
×
NEW
393
    existing_config = None
×
NEW
394
    if domain_map:
×
395
        print("\nDetected domains from Nginx:")
×
NEW
396
        sorted_domains = sorted(list(domain_map.keys()))
×
NEW
397
        for i, d in enumerate(sorted_domains):
×
NEW
398
            print(f"  {i + 1}. {d} (in {domain_map[d].name})")
×
NEW
399
        choice = get_input(f"Choose domain (1-{len(sorted_domains)}) or enter new", "1")
×
NEW
400
        if choice.isdigit() and 1 <= int(choice) <= len(sorted_domains):
×
NEW
401
            domain = sorted_domains[int(choice) - 1]
×
NEW
402
            existing_config = domain_map[domain]
×
403
        else:
404
            domain = choice
×
405
    else:
406
        domain = get_input("Domain for Nginx", env_vars.get("NOSSI", "nossinet.cc"))
×
407

408
    # 3. Prepare scripts
409
    deploy_sh_path = okysa_root / "scripts" / "deploy.sh"
×
410
    webhook_py_path = okysa_root / "scripts" / "webhook_listener.py"
×
411

412
    # Ensure scripts are executable
413
    os.chmod(deploy_sh_path, 0o755)
×
414
    os.chmod(webhook_py_path, 0o755)
×
415

416
    # 4. Generate Systemd Units
417
    # Use %h for home directory to avoid hardcoding the username in the unit files
418
    home_dir = str(Path.home())
×
419
    okysa_root_generic = str(okysa_root).replace(home_dir, "%h")
×
420
    env_path_generic = str(env_path).replace(home_dir, "%h")
×
421
    uv_path_generic = str(uv_path).replace(home_dir, "%h")
×
422

423
    bot_service = f"""[Unit]
×
424
Description=Okysa Discord Bot
425
After=network.target
426

427
[Service]
428
Type=simple
429
User={user}
430
WorkingDirectory={okysa_root_generic}
431
EnvironmentFile={env_path_generic}
432
ExecStart={uv_path_generic} run Okysa.py
433
Restart=always
434

435
[Install]
436
WantedBy=multi-user.target
437
"""
438

439
    webhook_service = f"""[Unit]
×
440
Description=Okysa Webhook Listener
441
After=network.target
442

443
[Service]
444
Type=simple
445
User={user}
446
WorkingDirectory={okysa_root_generic}
447
EnvironmentFile={env_path_generic}
448
ExecStart=/usr/bin/python3 {okysa_root_generic}/scripts/webhook_listener.py
449
Restart=always
450

451
[Install]
452
WantedBy=multi-user.target
453
"""
454

455
    # 5. Generate Nginx Config
NEW
456
    nginx_block = """
×
457
    location /webhook {
458
        proxy_pass http://localhost:5000;
459
        proxy_set_header Host $host;
460
        proxy_set_header X-Real-IP $remote_addr;
461
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
462
        proxy_set_header X-Forwarded-Proto $scheme;
463
    }
464
"""
NEW
465
    nginx_conf = f"""# Managed by Okysa Installer
×
466
server {{
467
    listen 80;
468
    server_name {domain};
469
{nginx_block}
470
}}
471
"""
472

473
    print("\n--- Proposed Actions ---")
×
474
    print(f"1. Save configuration to {env_path}")
×
475
    print("2. Write /etc/systemd/system/okysa.service")
×
476
    print("3. Write /etc/systemd/system/okysa-webhook.service")
×
NEW
477
    if existing_config:
×
NEW
478
        print(f"4. Inject /webhook into existing config: {existing_config.name}")
×
479
    else:
NEW
480
        print(f"4. Write new Nginx config: /etc/nginx/sites-available/{domain}")
×
UNCOV
481
    print("5. Add sudoers entry for systemctl restart")
×
482

483
    if get_input("Perform these actions? (requires sudo)", "n").lower() == "y":
×
484
        save_env(env_path, env_vars)
×
485
        print(f"Configuration saved to {env_path}")
×
486

487
        def sudo_write(content, path):
×
488
            subprocess.run(
×
489
                ["sudo", "tee", str(path)],
490
                input=content.encode(),
491
                stdout=subprocess.DEVNULL,
492
            )
493

494
        sudo_write(bot_service, "/etc/systemd/system/okysa.service")
×
495
        sudo_write(webhook_service, "/etc/systemd/system/okysa-webhook.service")
×
496

NEW
497
        if existing_config:
×
NEW
498
            inject_nginx_config(existing_config, domain, nginx_block)
×
499
        else:
NEW
500
            target_path = Path(f"/etc/nginx/sites-available/{domain}")
×
NEW
501
            if target_path.exists():
×
NEW
502
                new_backup = target_path.with_suffix(f".bak.{secrets.token_hex(4)}")
×
NEW
503
                print(
×
504
                    f"Existing config found at {target_path}. Backing up to {new_backup}"
505
                )
NEW
506
                subprocess.run(
×
507
                    ["sudo", "cp", str(target_path), str(new_backup)], check=True
508
                )
509

NEW
510
            sudo_write(nginx_conf, str(target_path))
×
NEW
511
            if (
×
512
                get_input(f"Link /etc/nginx/sites-enabled/{domain}? (y/n)", "y").lower()
513
                == "y"
514
            ):
NEW
515
                subprocess.run(
×
516
                    [
517
                        "sudo",
518
                        "ln",
519
                        "-sf",
520
                        f"/etc/nginx/sites-available/{domain}",
521
                        f"/etc/nginx/sites-enabled/{domain}",
522
                    ]
523
                )
524

525
        # Reload Nginx regardless of method (inject handles its own test)
NEW
526
        print("Reloading Nginx...")
×
NEW
527
        subprocess.run(["sudo", "systemctl", "reload", "nginx"])
×
528

529
        sudoers_line = (
×
530
            f"{user} ALL=(ALL) NOPASSWD: /usr/bin/systemctl restart okysa.service\n"
531
        )
532
        sudo_write(sudoers_line, "/etc/sudoers.d/okysa-deploy")
×
533

534
        subprocess.run(["sudo", "systemctl", "daemon-reload"])
×
UNCOV
535
        if get_input("Enable and start services now? (y/n)", "y").lower() == "y":
×
536
            subprocess.run(
×
537
                ["sudo", "systemctl", "enable", "--now", "okysa-webhook", "okysa"]
538
            )
539

540
        # Register webhook and only update secret if user said yes
541
        new_secret = register_webhook(domain, env_vars.get("GITHUB_WEBHOOK_SECRET"))
×
542
        if new_secret:
×
543
            env_vars["GITHUB_WEBHOOK_SECRET"] = new_secret
×
544
            save_env(env_path, env_vars)
×
545
            print(f"Updated {env_path} with the registered webhook secret.")
×
546
            print(f"Secret: {new_secret}")
×
547
        else:
548
            print("Webhook registration skipped or failed. Secret not updated in .env.")
×
549
            current_secret = env_vars.get("GITHUB_WEBHOOK_SECRET")
×
550
            if current_secret:
×
551
                print(f"Current secret preserved: {current_secret}")
×
552

553
        print("\nInstallation complete.")
×
554
    else:
555
        print("\nInstallation aborted. No changes made.")
×
556

557

558
if __name__ == "__main__":
1✔
559
    main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc