• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

x4dr / Okysa / 21275609216

23 Jan 2026 05:22AM UTC coverage: 81.146% (-0.5%) from 81.616%
21275609216

push

github

x4dr
env vars

4 of 10 new or added lines in 2 files covered. (40.0%)

75 existing lines in 1 file now uncovered.

2720 of 3352 relevant lines covered (81.15%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.18
/scripts/install.py
1
#!/usr/bin/env python3
2
import argparse
1✔
3
import getpass
1✔
4
import json
1✔
5
import os
1✔
6
import re
1✔
7
import secrets
1✔
8
import subprocess
1✔
9
from pathlib import Path
1✔
10

11

12
def get_input(prompt, default=None):
1✔
13
    if default:
×
14
        res = input(f"{prompt} [{default}]: ").strip()
×
15
        return res if res else default
×
16
    return input(f"{prompt}: ").strip()
×
17

18

19
def get_current_user():
1✔
20
    try:
×
21
        return os.getlogin()
×
22
    except OSError:
×
23
        return getpass.getuser()
×
24

25

26
def detect_domains():
1✔
27
    domains = []
×
28
    sites_enabled = Path("/etc/nginx/sites-enabled")
×
29
    if not sites_enabled.exists():
×
30
        return domains
×
31

32
    for site in sites_enabled.iterdir():
×
33
        try:
×
34
            content = site.read_text()
×
35
            matches = re.findall(r"server_name\s+([^;]+);", content)
×
36
            for match in matches:
×
37
                for domain in match.split():
×
38
                    if domain and not domain.startswith("_"):
×
39
                        domains.append(domain)
×
40
        except Exception:
×
41
            continue
×
42
    return sorted(list(set(domains)))
×
43

44

45
def get_repo_nwo():
1✔
46
    try:
×
47
        res = subprocess.run(
×
48
            ["gh", "repo", "view", "--json", "nameWithOwner", "-q", ".nameWithOwner"],
49
            capture_output=True,
50
            text=True,
51
            check=True,
52
        )
53
        return res.stdout.strip()
×
54
    except subprocess.CalledProcessError:
×
55
        return None
×
56

57

58
def register_webhook(domain, existing_secret):
1✔
59
    print("\n--- GitHub Webhook Registration ---")
×
60
    use_gh = get_input("Use 'gh' CLI to register/update webhook?", "y")
×
61
    if use_gh.lower() != "y":
×
62
        return None
×
63

64
    if subprocess.run(["which", "gh"], capture_output=True).returncode != 0:
×
65
        print("Error: 'gh' CLI not found.")
×
66
        return None
×
67

68
    # Check GH auth status
69
    auth_check = subprocess.run(["gh", "auth", "status"], capture_output=True)
×
70
    if auth_check.returncode != 0:
×
71
        print("GitHub CLI is not authenticated.")
×
72
        if get_input("Run 'gh auth login' now?", "y").lower() == "y":
×
73
            try:
×
74
                subprocess.run(["gh", "auth", "login"], check=True)
×
75
            except subprocess.CalledProcessError:
×
76
                return None
×
77
        else:
78
            return None
×
79

80
    nwo = get_repo_nwo()
×
81
    if not nwo:
×
82
        print("Could not determine repository name.")
×
83
        return None
×
84

85
    webhook_url = f"https://{domain}/webhook"
×
86

87
    # Generate new secret if needed
88
    secret = existing_secret or secrets.token_urlsafe(32)
×
89

90
    # Check for existing webhook
UNCOV
91
    try:
×
UNCOV
92
        hooks_json = subprocess.run(
×
93
            ["gh", "api", f"repos/{nwo}/hooks"],
94
            capture_output=True,
95
            text=True,
96
            check=True,
97
        ).stdout
UNCOV
98
        hooks = json.loads(hooks_json)
×
UNCOV
99
        existing_hook = next(
×
100
            (h for h in hooks if h.get("config", {}).get("url") == webhook_url), None
101
        )
102

UNCOV
103
        hook_data = {
×
104
            "active": True,
105
            "events": ["workflow_run"],
106
            "config": {
107
                "url": webhook_url,
108
                "content_type": "json",
109
                "secret": secret,
110
            },
111
        }
112

113
        if existing_hook:
×
UNCOV
114
            print(f"Found existing webhook (ID: {existing_hook['id']}). Updating...")
×
115
            endpoint = f"repos/{nwo}/hooks/{existing_hook['id']}"
×
116
            method = "PATCH"
×
117
        else:
118
            print("Creating new webhook...")
×
UNCOV
119
            hook_data["name"] = "web"
×
120
            endpoint = f"repos/{nwo}/hooks"
×
UNCOV
121
            method = "POST"
×
122

UNCOV
123
        subprocess.run(
×
124
            ["gh", "api", endpoint, "--method", method, "--input", "-"],
125
            input=json.dumps(hook_data),
126
            text=True,
127
            check=True,
128
        )
129
        print(f"Successfully registered/updated webhook: {webhook_url}")
×
130
        return secret
×
UNCOV
131
    except Exception as e:
×
UNCOV
132
        print(f"Failed to manage webhook: {e}")
×
UNCOV
133
        return None
×
134

135

136
def uninstall(okysa_root):
1✔
137
    print("\n--- Uninstalling Okysa Deployment ---")
×
138

139
    # Stop and disable services
140
    services = ["okysa.service", "okysa-webhook.service"]
×
141
    for svc in services:
×
142
        print(f"Stopping and disabling {svc}...")
×
UNCOV
143
        subprocess.run(["sudo", "systemctl", "stop", svc], stderr=subprocess.DEVNULL)
×
UNCOV
144
        subprocess.run(["sudo", "systemctl", "disable", svc], stderr=subprocess.DEVNULL)
×
UNCOV
145
        subprocess.run(
×
146
            ["sudo", "rm", f"/etc/systemd/system/{svc}"], stderr=subprocess.DEVNULL
147
        )
148

149
    # Nginx
150
    sites_available = Path("/etc/nginx/sites-available")
×
151
    if sites_available.exists():
×
152
        for site in sites_available.iterdir():
×
153
            try:
×
154
                content = site.read_text()
×
UNCOV
155
                if "# Managed by Okysa Installer" in content:
×
UNCOV
156
                    print(f"Removing Nginx config: {site.name}")
×
UNCOV
157
                    subprocess.run(
×
158
                        ["sudo", "rm", f"/etc/nginx/sites-enabled/{site.name}"],
159
                        stderr=subprocess.DEVNULL,
160
                    )
UNCOV
161
                    subprocess.run(["sudo", "rm", str(site)], stderr=subprocess.DEVNULL)
×
UNCOV
162
            except:
×
163
                continue
×
164

165
    # Sudoers
UNCOV
166
    print("Removing sudoers entry...")
×
UNCOV
167
    subprocess.run(
×
168
        ["sudo", "rm", "/etc/sudoers.d/okysa-deploy"], stderr=subprocess.DEVNULL
169
    )
170

UNCOV
171
    subprocess.run(["sudo", "systemctl", "daemon-reload"])
×
UNCOV
172
    print("\nUninstallation complete.")
×
173

174

175
def load_env(env_path):
1✔
176
    env_vars = {}
×
177
    if env_path.exists():
×
178
        with open(env_path, "r") as f:
×
179
            for line in f:
×
180
                line = line.strip()
×
181
                if "=" in line and not line.startswith("#"):
×
UNCOV
182
                    k, v = line.split("=", 1)
×
UNCOV
183
                    env_vars[k.strip()] = v.strip().strip('"').strip("'")
×
UNCOV
184
    return env_vars
×
185

186

187
def save_env(env_path, env_vars):
1✔
UNCOV
188
    with open(env_path, "w") as f:
×
UNCOV
189
        for k, v in sorted(env_vars.items()):
×
UNCOV
190
            f.write(f'{k}="{v}"\n')
×
191

192

193
def main():
1✔
UNCOV
194
    parser = argparse.ArgumentParser(description="Okysa Deployment Installer")
×
195
    parser.add_argument(
×
196
        "--uninstall", action="store_true", help="Uninstall the deployment"
197
    )
198
    args = parser.parse_args()
×
199

200
    script_path = Path(__file__).resolve()
×
UNCOV
201
    okysa_root = script_path.parent.parent
×
202
    env_path = okysa_root / ".env"
×
203
    home_dir = str(Path.home())
×
204

UNCOV
205
    if args.uninstall:
×
206
        uninstall(okysa_root)
×
207
        return
×
208

209
    gamepack_root = okysa_root.parent / "GamePack"
×
210
    uv_path = subprocess.getoutput("which uv") or "uv"
×
211

212
    print("--- Path Detection ---")
×
UNCOV
213
    print(f"Okysa Root:    {okysa_root}")
×
214
    if gamepack_root.exists():
×
215
        print(f"GamePack Root: {gamepack_root}")
×
216
    else:
UNCOV
217
        print(f"WARNING: GamePack not found at {gamepack_root}")
×
UNCOV
218
        gamepack_root = Path(
×
219
            get_input("Enter absolute path to GamePack", str(gamepack_root))
220
        )
221

222
    # 2. Gather Configuration
223
    print("\n--- Configuration ---")
×
UNCOV
224
    user = get_input("System user to run the bot", get_current_user())
×
225

226
    env_vars = load_env(env_path)
×
227

UNCOV
228
    def to_generic(p):
×
229
        return str(p).replace(home_dir, "~")
×
230

231
    # Required Env Vars
232
    env_vars["NOSSI"] = get_input(
×
233
        "NOSSI (domain for web services)", env_vars.get("NOSSI", "nossinet.cc")
234
    )
UNCOV
235
    env_vars["OLLAMA"] = get_input(
×
236
        "OLLAMA (URL for AI API)",
237
        env_vars.get("OLLAMA", "http://localhost:11434"),
238
    )
UNCOV
239
    env_vars["WIKI"] = get_input(
×
240
        "WIKI (path to wiki files)",
241
        env_vars.get("WIKI", to_generic(okysa_root.parent / "wiki")),
242
    )
UNCOV
243
    env_vars["STORAGE"] = get_input(
×
244
        "STORAGE (path to storage JSON)",
245
        env_vars.get("STORAGE", to_generic(okysa_root / "Golconda_storage.json")),
246
    )
247

248
    # Database detection and prompts
NEW
249
    db_files = list(okysa_root.glob("*.db")) + list(
×
250
        (okysa_root / "Golconda").glob("*.db")
251
    )
NEW
252
    db_files = [to_generic(f) for f in db_files]
×
NEW
253
    if db_files:
×
NEW
254
        print(f"\nDetected existing database files: {', '.join(db_files)}")
×
255

UNCOV
256
    env_vars["DATABASE"] = get_input(
×
257
        "DATABASE (path to main okysa.db)",
258
        env_vars.get("DATABASE", to_generic(okysa_root / "okysa.db")),
259
    )
NEW
260
    env_vars["REMIND_DATABASE"] = get_input(
×
261
        "REMIND_DATABASE (path to remind.db)",
262
        env_vars.get(
263
            "REMIND_DATABASE", to_generic(okysa_root / "Golconda" / "remind.db")
264
        ),
265
    )
266

267
    if "DISCORD_TOKEN" not in env_vars:
×
268
        token_path = Path.home() / "token.discord"
×
269
        default_token = ""
×
UNCOV
270
        if token_path.exists():
×
UNCOV
271
            default_token = token_path.read_text().strip()
×
UNCOV
272
        env_vars["DISCORD_TOKEN"] = get_input(
×
273
            "DISCORD_TOKEN", env_vars.get("DISCORD_TOKEN", default_token)
274
        )
275

276
    detected_domains = detect_domains()
×
277
    if detected_domains:
×
UNCOV
278
        print("\nDetected domains from Nginx:")
×
279
        for i, d in enumerate(detected_domains):
×
280
            print(f"  {i + 1}. {d}")
×
281
        choice = get_input(
×
282
            f"Choose domain (1-{len(detected_domains)}) or enter new", "1"
283
        )
284
        if choice.isdigit() and 1 <= int(choice) <= len(detected_domains):
×
UNCOV
285
            domain = detected_domains[int(choice) - 1]
×
286
        else:
287
            domain = choice
×
288
    else:
UNCOV
289
        domain = get_input("Domain for Nginx", env_vars.get("NOSSI", "nossinet.cc"))
×
290

291
    # 3. Prepare scripts
292
    deploy_sh_path = okysa_root / "scripts" / "deploy.sh"
×
UNCOV
293
    webhook_py_path = okysa_root / "scripts" / "webhook_listener.py"
×
294

295
    # Ensure scripts are executable
296
    os.chmod(deploy_sh_path, 0o755)
×
UNCOV
297
    os.chmod(webhook_py_path, 0o755)
×
298

299
    # 4. Generate Systemd Units
300
    # Use %h for home directory to avoid hardcoding the username in the unit files
UNCOV
301
    home_dir = str(Path.home())
×
UNCOV
302
    okysa_root_generic = str(okysa_root).replace(home_dir, "%h")
×
UNCOV
303
    env_path_generic = str(env_path).replace(home_dir, "%h")
×
304
    uv_path_generic = str(uv_path).replace(home_dir, "%h")
×
305

306
    bot_service = f"""[Unit]
×
307
Description=Okysa Discord Bot
308
After=network.target
309

310
[Service]
311
Type=simple
312
User={user}
313
WorkingDirectory={okysa_root_generic}
314
EnvironmentFile={env_path_generic}
315
ExecStart={uv_path_generic} run Okysa.py
316
Restart=always
317

318
[Install]
319
WantedBy=multi-user.target
320
"""
321

UNCOV
322
    webhook_service = f"""[Unit]
×
323
Description=Okysa Webhook Listener
324
After=network.target
325

326
[Service]
327
Type=simple
328
User={user}
329
WorkingDirectory={okysa_root_generic}
330
EnvironmentFile={env_path_generic}
331
ExecStart=/usr/bin/python3 {okysa_root_generic}/scripts/webhook_listener.py
332
Restart=always
333

334
[Install]
335
WantedBy=multi-user.target
336
"""
337

338
    # 5. Generate Nginx Config
UNCOV
339
    nginx_conf = f"""# Managed by Okysa Installer
×
340
server {{
341
    listen 80;
342
    server_name {domain};
343

344
    location /webhook {{
345
        proxy_pass http://localhost:5000;
346
        proxy_set_header Host $host;
347
        proxy_set_header X-Real-IP $remote_addr;
348
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
349
        proxy_set_header X-Forwarded-Proto $scheme;
350
    }}
351
}}
352
"""
353

UNCOV
354
    print("\n--- Proposed Actions ---")
×
UNCOV
355
    print(f"1. Save configuration to {env_path}")
×
UNCOV
356
    print("2. Write /etc/systemd/system/okysa.service")
×
357
    print("3. Write /etc/systemd/system/okysa-webhook.service")
×
358
    print(f"4. Write /etc/nginx/sites-available/{domain}")
×
359
    print("5. Add sudoers entry for systemctl restart")
×
360

361
    if get_input("Perform these actions? (requires sudo)", "n").lower() == "y":
×
UNCOV
362
        save_env(env_path, env_vars)
×
363
        print(f"Configuration saved to {env_path}")
×
364

365
        def sudo_write(content, path):
×
366
            subprocess.run(
×
367
                ["sudo", "tee", str(path)],
368
                input=content.encode(),
369
                stdout=subprocess.DEVNULL,
370
            )
371

372
        sudo_write(bot_service, "/etc/systemd/system/okysa.service")
×
373
        sudo_write(webhook_service, "/etc/systemd/system/okysa-webhook.service")
×
374
        sudo_write(nginx_conf, f"/etc/nginx/sites-available/{domain}")
×
375

376
        sudoers_line = (
×
377
            f"{user} ALL=(ALL) NOPASSWD: /usr/bin/systemctl restart okysa.service\n"
378
        )
379
        sudo_write(sudoers_line, "/etc/sudoers.d/okysa-deploy")
×
380

381
        if (
×
382
            get_input(f"Link /etc/nginx/sites-enabled/{domain}? (y/n)", "y").lower()
383
            == "y"
384
        ):
385
            subprocess.run(
×
386
                [
387
                    "sudo",
388
                    "ln",
389
                    "-sf",
390
                    f"/etc/nginx/sites-available/{domain}",
391
                    f"/etc/nginx/sites-enabled/{domain}",
392
                ]
393
            )
394
            # Check Nginx config and reload
395
            print("Checking Nginx configuration...")
×
396
            if subprocess.run(["sudo", "nginx", "-t"]).returncode == 0:
×
397
                print("Reloading Nginx...")
×
UNCOV
398
                subprocess.run(["sudo", "systemctl", "reload", "nginx"])
×
399
            else:
UNCOV
400
                print("WARNING: Nginx configuration test failed. Not reloading.")
×
401

402
        subprocess.run(["sudo", "systemctl", "daemon-reload"])
×
UNCOV
403
        if get_input("Enable and start services now? (y/n)", "y").lower() == "y":
×
404
            subprocess.run(
×
405
                ["sudo", "systemctl", "enable", "--now", "okysa-webhook", "okysa"]
406
            )
407

408
        # Register webhook and only update secret if user said yes
UNCOV
409
        new_secret = register_webhook(domain, env_vars.get("GITHUB_WEBHOOK_SECRET"))
×
UNCOV
410
        if new_secret:
×
UNCOV
411
            env_vars["GITHUB_WEBHOOK_SECRET"] = new_secret
×
UNCOV
412
            save_env(env_path, env_vars)
×
UNCOV
413
            print(f"Updated {env_path} with the registered webhook secret.")
×
UNCOV
414
            print(f"Secret: {new_secret}")
×
415
        else:
UNCOV
416
            print("Webhook registration skipped or failed. Secret not updated in .env.")
×
UNCOV
417
            current_secret = env_vars.get("GITHUB_WEBHOOK_SECRET")
×
UNCOV
418
            if current_secret:
×
UNCOV
419
                print(f"Current secret preserved: {current_secret}")
×
420

UNCOV
421
        print("\nInstallation complete.")
×
422
    else:
UNCOV
423
        print("\nInstallation aborted. No changes made.")
×
424

425

426
if __name__ == "__main__":
1✔
UNCOV
427
    main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc