• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

x4dr / Okysa / 21276210669

23 Jan 2026 05:53AM UTC coverage: 80.57% (-0.6%) from 81.146%
21276210669

push

github

x4dr
installing fixes

9 of 32 new or added lines in 3 files covered. (28.13%)

8 existing lines in 1 file now uncovered.

2716 of 3371 relevant lines covered (80.57%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.76
/scripts/install.py
1
#!/usr/bin/env python3
2
import argparse
1✔
3
import getpass
1✔
4
import json
1✔
5
import os
1✔
6
import re
1✔
7
import secrets
1✔
8
import subprocess
1✔
9
from pathlib import Path
1✔
10

11

12
def get_input(prompt, default=None):
1✔
13
    if default:
×
14
        res = input(f"{prompt} [{default}]: ").strip()
×
15
        return res if res else default
×
16
    return input(f"{prompt}: ").strip()
×
17

18

19
def get_current_user():
1✔
20
    try:
×
21
        return os.getlogin()
×
22
    except OSError:
×
23
        return getpass.getuser()
×
24

25

26
def detect_domains():
1✔
27
    domains = []
×
28
    sites_enabled = Path("/etc/nginx/sites-enabled")
×
29
    if not sites_enabled.exists():
×
30
        return domains
×
31

32
    for site in sites_enabled.iterdir():
×
33
        try:
×
34
            content = site.read_text()
×
35
            matches = re.findall(r"server_name\s+([^;]+);", content)
×
36
            for match in matches:
×
37
                for domain in match.split():
×
38
                    if domain and not domain.startswith("_"):
×
39
                        domains.append(domain)
×
40
        except Exception:
×
41
            continue
×
42
    return sorted(list(set(domains)))
×
43

44

45
def get_repo_nwo():
1✔
46
    try:
×
47
        res = subprocess.run(
×
48
            ["gh", "repo", "view", "--json", "nameWithOwner", "-q", ".nameWithOwner"],
49
            capture_output=True,
50
            text=True,
51
            check=True,
52
        )
53
        return res.stdout.strip()
×
54
    except subprocess.CalledProcessError:
×
55
        return None
×
56

57

58
def register_webhook(domain, existing_secret):
1✔
59
    print("\n--- GitHub Webhook Registration ---")
×
60
    use_gh = get_input("Use 'gh' CLI to register/update webhook?", "y")
×
61
    if use_gh.lower() != "y":
×
62
        return None
×
63

64
    if subprocess.run(["which", "gh"], capture_output=True).returncode != 0:
×
65
        print("Error: 'gh' CLI not found.")
×
66
        return None
×
67

68
    # Check GH auth status
69
    auth_check = subprocess.run(["gh", "auth", "status"], capture_output=True)
×
70
    if auth_check.returncode != 0:
×
71
        print("GitHub CLI is not authenticated.")
×
72
        if get_input("Run 'gh auth login' now?", "y").lower() == "y":
×
73
            try:
×
74
                subprocess.run(["gh", "auth", "login"], check=True)
×
75
            except subprocess.CalledProcessError:
×
76
                return None
×
77
        else:
78
            return None
×
79

80
    nwo = get_repo_nwo()
×
81
    if not nwo:
×
82
        print("Could not determine repository name.")
×
83
        return None
×
84

85
    webhook_url = f"https://{domain}/webhook"
×
86

87
    # Generate new secret if needed
88
    secret = existing_secret or secrets.token_urlsafe(32)
×
89

90
    # Check for existing webhook
91
    try:
×
92
        hooks_json = subprocess.run(
×
93
            ["gh", "api", f"repos/{nwo}/hooks"],
94
            capture_output=True,
95
            text=True,
96
            check=True,
97
        ).stdout
98
        hooks = json.loads(hooks_json)
×
99
        existing_hook = next(
×
100
            (h for h in hooks if h.get("config", {}).get("url") == webhook_url), None
101
        )
102

103
        hook_data = {
×
104
            "active": True,
105
            "events": ["workflow_run"],
106
            "config": {
107
                "url": webhook_url,
108
                "content_type": "json",
109
                "secret": secret,
110
            },
111
        }
112

113
        if existing_hook:
×
114
            print(f"Found existing webhook (ID: {existing_hook['id']}). Updating...")
×
115
            endpoint = f"repos/{nwo}/hooks/{existing_hook['id']}"
×
116
            method = "PATCH"
×
117
        else:
118
            print("Creating new webhook...")
×
119
            hook_data["name"] = "web"
×
120
            endpoint = f"repos/{nwo}/hooks"
×
121
            method = "POST"
×
122

123
        subprocess.run(
×
124
            ["gh", "api", endpoint, "--method", method, "--input", "-"],
125
            input=json.dumps(hook_data),
126
            text=True,
127
            check=True,
128
        )
129
        print(f"Successfully registered/updated webhook: {webhook_url}")
×
130
        return secret
×
131
    except Exception as e:
×
132
        print(f"Failed to manage webhook: {e}")
×
133
        return None
×
134

135

136
def uninstall(okysa_root):
1✔
137
    print("\n--- Uninstalling Okysa Deployment ---")
×
138

139
    # Stop and disable services
140
    services = ["okysa.service", "okysa-webhook.service"]
×
141
    for svc in services:
×
142
        print(f"Stopping and disabling {svc}...")
×
143
        subprocess.run(["sudo", "systemctl", "stop", svc], stderr=subprocess.DEVNULL)
×
144
        subprocess.run(["sudo", "systemctl", "disable", svc], stderr=subprocess.DEVNULL)
×
145
        subprocess.run(
×
146
            ["sudo", "rm", f"/etc/systemd/system/{svc}"], stderr=subprocess.DEVNULL
147
        )
148

149
    # Nginx
150
    sites_available = Path("/etc/nginx/sites-available")
×
151
    if sites_available.exists():
×
152
        for site in sites_available.iterdir():
×
153
            try:
×
154
                content = site.read_text()
×
155
                if "# Managed by Okysa Installer" in content:
×
156
                    print(f"Removing Nginx config: {site.name}")
×
157
                    subprocess.run(
×
158
                        ["sudo", "rm", f"/etc/nginx/sites-enabled/{site.name}"],
159
                        stderr=subprocess.DEVNULL,
160
                    )
161
                    subprocess.run(["sudo", "rm", str(site)], stderr=subprocess.DEVNULL)
×
162
            except:
×
163
                continue
×
164

165
    # Sudoers
166
    print("Removing sudoers entry...")
×
167
    subprocess.run(
×
168
        ["sudo", "rm", "/etc/sudoers.d/okysa-deploy"], stderr=subprocess.DEVNULL
169
    )
170

171
    subprocess.run(["sudo", "systemctl", "daemon-reload"])
×
172
    print("\nUninstallation complete.")
×
173

174

175
def load_env(env_path):
1✔
176
    env_vars = {}
×
177
    if env_path.exists():
×
178
        with open(env_path, "r") as f:
×
179
            for line in f:
×
180
                line = line.strip()
×
181
                if "=" in line and not line.startswith("#"):
×
182
                    k, v = line.split("=", 1)
×
183
                    env_vars[k.strip()] = v.strip().strip('"').strip("'")
×
184
    return env_vars
×
185

186

187
def save_env(env_path, env_vars):
1✔
188
    with open(env_path, "w") as f:
×
189
        for k, v in sorted(env_vars.items()):
×
190
            f.write(f'{k}="{v}"\n')
×
191

192

193
def main():
1✔
194
    parser = argparse.ArgumentParser(description="Okysa Deployment Installer")
×
195
    parser.add_argument(
×
196
        "--uninstall", action="store_true", help="Uninstall the deployment"
197
    )
198
    args = parser.parse_args()
×
199

200
    script_path = Path(__file__).resolve()
×
201
    okysa_root = script_path.parent.parent
×
202
    env_path = okysa_root / ".env"
×
203
    home_dir = str(Path.home())
×
204

205
    if args.uninstall:
×
206
        uninstall(okysa_root)
×
207
        return
×
208

209
    gamepack_root = okysa_root.parent / "GamePack"
×
210
    uv_path = subprocess.getoutput("which uv") or "uv"
×
211

212
    print("--- Path Detection ---")
×
213
    print(f"Okysa Root:    {okysa_root}")
×
214
    if gamepack_root.exists():
×
215
        print(f"GamePack Root: {gamepack_root}")
×
216
    else:
217
        print(f"WARNING: GamePack not found at {gamepack_root}")
×
218
        gamepack_root = Path(
×
219
            get_input("Enter absolute path to GamePack", str(gamepack_root))
220
        )
221

222
    # 2. Gather Configuration
223
    print("\n--- Configuration ---")
×
224
    user = get_input("System user to run the bot", get_current_user())
×
225

226
    env_vars = load_env(env_path)
×
227

228
    def to_generic(p):
×
229
        return str(p).replace(home_dir, "~")
×
230

231
    # Required Env Vars
232
    env_vars["NOSSI"] = get_input(
×
233
        "NOSSI (domain for web services)", env_vars.get("NOSSI", "nossinet.cc")
234
    )
235
    env_vars["OLLAMA"] = get_input(
×
236
        "OLLAMA (URL for AI API)",
237
        env_vars.get("OLLAMA", "http://localhost:11434"),
238
    )
239
    env_vars["WIKI"] = get_input(
×
240
        "WIKI (path to wiki files)",
241
        env_vars.get("WIKI", to_generic(okysa_root.parent / "wiki")),
242
    )
243
    env_vars["STORAGE"] = get_input(
×
244
        "STORAGE (path to storage JSON)",
245
        env_vars.get("STORAGE", to_generic(okysa_root / "Golconda_storage.json")),
246
    )
247

248
    # Database detection and prompts
249
    # Look for known historical locations
NEW
250
    possible_dbs = [
×
251
        okysa_root / "okysa.db",
252
        Path.home() / "NN.db",
253
        okysa_root / "Golconda" / "remind.db",
254
    ]
NEW
255
    db_files = [to_generic(f) for f in possible_dbs if f.exists()]
×
256

257
    # Also scan for any other .db files in project
NEW
258
    scan_dbs = list(okysa_root.glob("*.db")) + list(
×
259
        (okysa_root / "Golconda").glob("*.db")
260
    )
NEW
261
    for f in scan_dbs:
×
NEW
262
        gf = to_generic(f)
×
NEW
263
        if gf not in db_files:
×
NEW
264
            db_files.append(gf)
×
265

266
    if db_files:
×
267
        print(f"\nDetected existing database files: {', '.join(db_files)}")
×
NEW
268
        if "~/NN.db" in db_files:
×
NEW
269
            print(
×
270
                "Note: ~/NN.db was detected and contains historical configs/chatlogs."
271
            )
272

273
    env_vars["DATABASE"] = get_input(
×
274
        "DATABASE (path to main okysa.db)",
275
        env_vars.get(
276
            "DATABASE",
277
            (
278
                to_generic(Path.home() / "NN.db")
279
                if (Path.home() / "NN.db").exists()
280
                else to_generic(okysa_root / "okysa.db")
281
            ),
282
        ),
283
    )
284
    env_vars["REMIND_DATABASE"] = get_input(
×
285
        "REMIND_DATABASE (path to remind.db)",
286
        env_vars.get(
287
            "REMIND_DATABASE", to_generic(okysa_root / "Golconda" / "remind.db")
288
        ),
289
    )
290

291
    # Storage detection
NEW
292
    possible_storages = [
×
293
        okysa_root / "Golconda_storage.json",
294
    ]
NEW
295
    storage_files = [to_generic(f) for f in possible_storages if f.exists()]
×
NEW
296
    if storage_files:
×
NEW
297
        print(f"\nDetected storage files: {', '.join(storage_files)}")
×
298

NEW
299
    env_vars["STORAGE"] = get_input(
×
300
        "STORAGE (path to storage JSON)",
301
        env_vars.get(
302
            "STORAGE",
303
            to_generic(okysa_root / "Golconda_storage.json"),
304
        ),
305
    )
306

307
    if "DISCORD_TOKEN" not in env_vars:
×
308
        token_path = Path.home() / "token.discord"
×
309
        default_token = ""
×
310
        if token_path.exists():
×
311
            default_token = token_path.read_text().strip()
×
312
        env_vars["DISCORD_TOKEN"] = get_input(
×
313
            "DISCORD_TOKEN", env_vars.get("DISCORD_TOKEN", default_token)
314
        )
315

316
    detected_domains = detect_domains()
×
317
    if detected_domains:
×
318
        print("\nDetected domains from Nginx:")
×
319
        for i, d in enumerate(detected_domains):
×
320
            print(f"  {i + 1}. {d}")
×
321
        choice = get_input(
×
322
            f"Choose domain (1-{len(detected_domains)}) or enter new", "1"
323
        )
324
        if choice.isdigit() and 1 <= int(choice) <= len(detected_domains):
×
325
            domain = detected_domains[int(choice) - 1]
×
326
        else:
327
            domain = choice
×
328
    else:
329
        domain = get_input("Domain for Nginx", env_vars.get("NOSSI", "nossinet.cc"))
×
330

331
    # 3. Prepare scripts
332
    deploy_sh_path = okysa_root / "scripts" / "deploy.sh"
×
333
    webhook_py_path = okysa_root / "scripts" / "webhook_listener.py"
×
334

335
    # Ensure scripts are executable
336
    os.chmod(deploy_sh_path, 0o755)
×
337
    os.chmod(webhook_py_path, 0o755)
×
338

339
    # 4. Generate Systemd Units
340
    # Use %h for home directory to avoid hardcoding the username in the unit files
341
    home_dir = str(Path.home())
×
342
    okysa_root_generic = str(okysa_root).replace(home_dir, "%h")
×
343
    env_path_generic = str(env_path).replace(home_dir, "%h")
×
344
    uv_path_generic = str(uv_path).replace(home_dir, "%h")
×
345

346
    bot_service = f"""[Unit]
×
347
Description=Okysa Discord Bot
348
After=network.target
349

350
[Service]
351
Type=simple
352
User={user}
353
WorkingDirectory={okysa_root_generic}
354
EnvironmentFile={env_path_generic}
355
ExecStart={uv_path_generic} run Okysa.py
356
Restart=always
357

358
[Install]
359
WantedBy=multi-user.target
360
"""
361

362
    webhook_service = f"""[Unit]
×
363
Description=Okysa Webhook Listener
364
After=network.target
365

366
[Service]
367
Type=simple
368
User={user}
369
WorkingDirectory={okysa_root_generic}
370
EnvironmentFile={env_path_generic}
371
ExecStart=/usr/bin/python3 {okysa_root_generic}/scripts/webhook_listener.py
372
Restart=always
373

374
[Install]
375
WantedBy=multi-user.target
376
"""
377

378
    # 5. Generate Nginx Config
379
    nginx_conf = f"""# Managed by Okysa Installer
×
380
server {{
381
    listen 80;
382
    server_name {domain};
383

384
    location /webhook {{
385
        proxy_pass http://localhost:5000;
386
        proxy_set_header Host $host;
387
        proxy_set_header X-Real-IP $remote_addr;
388
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
389
        proxy_set_header X-Forwarded-Proto $scheme;
390
    }}
391
}}
392
"""
393

394
    print("\n--- Proposed Actions ---")
×
395
    print(f"1. Save configuration to {env_path}")
×
396
    print("2. Write /etc/systemd/system/okysa.service")
×
397
    print("3. Write /etc/systemd/system/okysa-webhook.service")
×
398
    print(f"4. Write /etc/nginx/sites-available/{domain}")
×
399
    print("5. Add sudoers entry for systemctl restart")
×
400

401
    if get_input("Perform these actions? (requires sudo)", "n").lower() == "y":
×
402
        save_env(env_path, env_vars)
×
403
        print(f"Configuration saved to {env_path}")
×
404

405
        def sudo_write(content, path):
×
406
            subprocess.run(
×
407
                ["sudo", "tee", str(path)],
408
                input=content.encode(),
409
                stdout=subprocess.DEVNULL,
410
            )
411

412
        sudo_write(bot_service, "/etc/systemd/system/okysa.service")
×
413
        sudo_write(webhook_service, "/etc/systemd/system/okysa-webhook.service")
×
414
        sudo_write(nginx_conf, f"/etc/nginx/sites-available/{domain}")
×
415

416
        sudoers_line = (
×
417
            f"{user} ALL=(ALL) NOPASSWD: /usr/bin/systemctl restart okysa.service\n"
418
        )
419
        sudo_write(sudoers_line, "/etc/sudoers.d/okysa-deploy")
×
420

421
        if (
×
422
            get_input(f"Link /etc/nginx/sites-enabled/{domain}? (y/n)", "y").lower()
423
            == "y"
424
        ):
425
            subprocess.run(
×
426
                [
427
                    "sudo",
428
                    "ln",
429
                    "-sf",
430
                    f"/etc/nginx/sites-available/{domain}",
431
                    f"/etc/nginx/sites-enabled/{domain}",
432
                ]
433
            )
434
            # Check Nginx config and reload
435
            print("Checking Nginx configuration...")
×
436
            if subprocess.run(["sudo", "nginx", "-t"]).returncode == 0:
×
437
                print("Reloading Nginx...")
×
438
                subprocess.run(["sudo", "systemctl", "reload", "nginx"])
×
439
            else:
440
                print("WARNING: Nginx configuration test failed. Not reloading.")
×
441

442
        subprocess.run(["sudo", "systemctl", "daemon-reload"])
×
443
        if get_input("Enable and start services now? (y/n)", "y").lower() == "y":
×
444
            subprocess.run(
×
445
                ["sudo", "systemctl", "enable", "--now", "okysa-webhook", "okysa"]
446
            )
447

448
        # Register webhook and only update secret if user said yes
449
        new_secret = register_webhook(domain, env_vars.get("GITHUB_WEBHOOK_SECRET"))
×
450
        if new_secret:
×
451
            env_vars["GITHUB_WEBHOOK_SECRET"] = new_secret
×
452
            save_env(env_path, env_vars)
×
453
            print(f"Updated {env_path} with the registered webhook secret.")
×
454
            print(f"Secret: {new_secret}")
×
455
        else:
456
            print("Webhook registration skipped or failed. Secret not updated in .env.")
×
457
            current_secret = env_vars.get("GITHUB_WEBHOOK_SECRET")
×
458
            if current_secret:
×
459
                print(f"Current secret preserved: {current_secret}")
×
460

461
        print("\nInstallation complete.")
×
462
    else:
463
        print("\nInstallation aborted. No changes made.")
×
464

465

466
if __name__ == "__main__":
1✔
467
    main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc