• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

desy-multimessenger / nuztf / 13725459265

07 Mar 2025 04:58PM UTC coverage: 71.901% (-1.7%) from 73.615%
13725459265

push

github

web-flow
Add Kowalski Backend (#490)

471 of 640 new or added lines in 26 files covered. (73.59%)

15 existing lines in 4 files now uncovered.

1978 of 2751 relevant lines covered (71.9%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.48
/nuztf/fritz.py
1
import os
1✔
2

3
import backoff
1✔
4
import requests  # type: ignore
1✔
5

6
from nuztf.credentials import load_credentials
1✔
7

8
# Fritz API URLs
9

10
API_BASEURL = "https://fritz.science"
1✔
11

12

13
def get_fritz_token() -> str:
1✔
14
    """
15
    Get the Fritz token from the environment variable or from the credentials file
16
    :return: Fritz token
17
    """
NEW
18
    fritz_token = load_credentials("fritz", token_based=True)
×
NEW
19
    return fritz_token
×
20

21

22
def fritz_api(method: str, endpoint_extension: str, data: dict = None):
1✔
23
    """
24
    Make a request to the Fritz API
25

26
    :param method: Method to use
27
    :param endpoint_extension: Endpoint extension (e.g. "api/sources")
28
    :param data: Data to send (e.g. {"ra": 0, "dec": 0})
29
    :return:
30
    """
NEW
31
    headers = {"Authorization": f"token {get_fritz_token()}"}
×
32
    endpoint = os.path.join(API_BASEURL, endpoint_extension)
×
33
    if method in ["post", "POST"]:
×
34
        response = requests.request(method, endpoint, json=data, headers=headers)
×
35
    elif method in ["get", "GET"]:
×
36
        response = requests.request(method, endpoint, params=data, headers=headers)
×
37
    else:
38
        raise ValueError("You have to use either 'get' or 'post'")
39
    return response
×
40

41

42
@backoff.on_exception(
1✔
43
    backoff.expo,
44
    requests.exceptions.RequestException,
45
    max_time=60,
46
)
47
def save_source_to_group(object_id: str, group_id: int):
1✔
48
    payload = {
×
49
        "objId": object_id,
50
        "inviteGroupIds": [group_id],
51
    }
52
    return fritz_api(
×
53
        method="POST", endpoint_extension="api/source_groups", data=payload
54
    )
55

56

57
def delete_source_from_group(object_id: str, group_id: int):
1✔
58
    payload = {
×
59
        "objId": object_id,
60
        "unsaveGroupIds": [group_id],
61
    }
62
    return fritz_api(
×
63
        method="POST", endpoint_extension="api/source_groups", data=payload
64
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc