• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

XAMS-nikhef / amstrax / 15042264012

15 May 2025 10:06AM UTC coverage: 41.499% (-4.3%) from 45.824%
15042264012

push

github

web-flow
Corrections Framework and Offline Processing (#317)

* patch ext peaks

* done xams config

* no fallback

* add amstrax files dep

* newlines

* add corrections services to amstrax

* remove amstrax files deps

* bug

* forgot import

* add requests req

* forgot one

* forgot one

* forgot one

* corrections not there yey

* allow branch

* tests added

* tests added

* fix eval and context default

* add corrections

* add xamsconfigs and clean unused

* forgot import

* forgot import

* forgot import

* forgot import

* allow just value in global configs

* fix use in peaks plugin

* use well elife

* setup function to avoid calling 100 times

* strange..

* add cache

* make nparray

* allow other than json in global

* Offline processing (#318)

* migrate to htcondor

* refactor processing

* remove test files

* fix

* fix

* offline works well

* fix import

* support from file

* fix import

* codefactor

* more imports

* better imports

* try tests..

* update offline processing

* only for xamsdata

* dont save records

* no modules

* bug

* bug

* do not use partial

* do not use partial

* check for production

* add queue arg

* rename and remove tests

* adjust online processing

* adjust online processing

* output folder in context

* process production

* fix name and output folder

* fix name and output folder

* fix name and output folder

* fix name and output folder

* change context folder

* fix add data entry keys

* default short queue

* run ids from file

* run ids from file

* add run ranges and special ledcal

* add run ranges and special ledcal

* add run ranges and special ledcal

* add run ranges and special ledcal

* add run ranges and special ledcal

* add run ranges and special ledcal

* missed indent

* add peaks ext and NaI

* save when also for ext

* save area per channel

* s1 naive z correction

* s1 naive z correction

* s1 naive z correction

180 of 720 new or added lines in 20 files covered. (25.0%)

4 existing lines in 3 files now uncovered.

1567 of 3776 relevant lines covered (41.5%)

1.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/amstrax/auto_processing_new/auto_processing.py
NEW
1
import argparse
×
NEW
2
import time
×
NEW
3
import os
×
NEW
4
import logging
×
NEW
5
from datetime import datetime, timedelta
×
6

NEW
7
from job_submission import submit_job
×
NEW
8
from db_utils import query_runs, update_processing_status
×
9

NEW
10
logging.basicConfig(level=logging.INFO)
×
NEW
11
log = logging.getLogger(__name__)
×
12

NEW
13
SETUP_FILE = "/data/xenon/xams_v2/setup.sh"
×
14

NEW
15
def parse_args():
×
16
    """
17
    Parse command-line arguments for online auto-processing.
18
    """
NEW
19
    parser = argparse.ArgumentParser(description="Autoprocess XAMS live data")
×
NEW
20
    parser.add_argument(
×
21
        "--target",
22
        nargs="*",
23
        default=["raw_records"],
24
        help="Target final data type to produce (e.g., raw_records, peaks, events).",
25
    )
NEW
26
    parser.add_argument("--output_folder", default="/data/xenon/xams_v2/xams_processed", help="Path where to save processed data")
×
NEW
27
    parser.add_argument("--timeout", default=20, type=int, help="Time (in seconds) between checks.")
×
NEW
28
    parser.add_argument("--max_jobs", default=5, type=int, help="Maximum number of jobs to submit simultaneously.")
×
NEW
29
    parser.add_argument("--run_id", default=None, help="Single run ID to process manually.")
×
NEW
30
    parser.add_argument("--mem", default=8000, type=int, help="Memory per CPU in MB.")
×
NEW
31
    parser.add_argument("--logs_path", default="/data/xenon/xams_v2/logs/", help="Path to save job logs.")
×
NEW
32
    parser.add_argument("--production", action="store_true", help="Run in production mode (will update the rundb).")
×
NEW
33
    parser.add_argument("--set_config_kwargs", default="{}", help="Dictionary of kwargs to pass to set_config.")
×
NEW
34
    parser.add_argument("--set_context_kwargs", default="{}", help="Dictionary of kwargs to pass to set_context.")
×
NEW
35
    parser.add_argument("--amstrax_path", default=None, help="Path to the amstrax directory.")
×
NEW
36
    parser.add_argument("--corrections_version", default=None, help="Version of corrections to use.")
×
NEW
37
    parser.add_argument("--queue", default="short", help="Queue to submit jobs to. See Nikhef docs for options.")
×
38

NEW
39
    return parser.parse_args()
×
40

41

NEW
42
def main(args):
×
43
    """
44
    Main function for continuous auto-processing of XAMS live data.
45
    """
NEW
46
    log.info("Starting XAMS auto-processing script...")
×
NEW
47
    amstrax_dir = amstrax.amstrax_dir  # Path to the amstrax directory
×
NEW
48
    nap_time = int(args.timeout)
×
NEW
49
    output_folder = args.output_folder
×
NEW
50
    targets = " ".join(args.target)
×
NEW
51
    runs_col = amstrax.get_mongo_collection()
×
52

NEW
53
    client = amstrax.get_mongo_client()
×
NEW
54
    processing_db = client["daq"]["processing"]
×
55

56
    while True:
57
        # Check if auto-processing is enabled
NEW
58
        auto_processing_on = processing_db.find_one({"name": "auto_processing"})["status"] == "on"
×
59

60
        # Update task list and check for new runs
NEW
61
        run_docs_to_do = update_task_list(args, runs_col, auto_processing_on)
×
62

63
        # Handle running jobs
NEW
64
        handle_running_jobs(runs_col, production=args.production)
×
65

NEW
66
        if not run_docs_to_do:
×
NEW
67
            log.info(f"No runs to process. Sleeping for {nap_time} seconds.")
×
NEW
68
            time.sleep(nap_time)
×
NEW
69
            continue
×
70

71
        # Submit new jobs if below max limit
NEW
72
        submit_new_jobs(args, runs_col, run_docs_to_do, amstrax_dir)
×
73

NEW
74
        if args.run_id:
×
NEW
75
            log.info("Finished processing run.")
×
NEW
76
            break
×
77

NEW
78
        log.info(f"Sleeping for {nap_time} seconds before next check...")
×
NEW
79
        time.sleep(nap_time)
×
80

81

NEW
82
def update_task_list(args, runs_col, auto_processing_on):
×
83
    """
84
    Update and return the list of tasks to be processed based on MongoDB queries.
85
    """
NEW
86
    query = {
×
87
        "data": {"$elemMatch": {"type": "live", "host": "stbc"}},
88
        "$or": [
89
            {
90
                "data": {"$not": {"$elemMatch": {"host": "stbc", "type": "raw_records"}}},
91
                "processing_failed": {"$not": {"$gt": 3}},
92
                "processing_status.status": {"$not": {"$in": ["running", "submitted"]}},
93
                "tags": {"$not": {"$elemMatch": {"name": "abandon"}}},
94
                "start": {"$gt": datetime.today() - timedelta(days=100)},
95
            },
96
            {"tags": {"$elemMatch": {"name": "process"}}},
97
        ],
98
    }
99

NEW
100
    if not auto_processing_on:
×
NEW
101
        query = {"data": {"$elemMatch": {"type": "live", "host": "stbc"}}, "tags": {"$elemMatch": {"name": "process"}}}
×
102

NEW
103
    projection = {"number": 1, "start": 1, "end": 1, "data": 1, "processing_status": 1, "processing_failed": 1}
×
NEW
104
    sort = [("number", -1)]
×
NEW
105
    run_docs_to_do = list(runs_col.find(query, projection).sort(sort))
×
106

NEW
107
    if args.run_id:
×
NEW
108
        run_docs_to_do = [runs_col.find_one({"number": int(args.run_id)}, projection)]
×
109

NEW
110
    if run_docs_to_do:
×
NEW
111
        log.info(f'Found {len(run_docs_to_do)} runs to process: {[run_doc["number"] for run_doc in run_docs_to_do]}')
×
NEW
112
    return run_docs_to_do
×
113

114

NEW
115
def handle_running_jobs(runs_col, production=False):
×
116
    """
117
    Check and update the status of running jobs. Mark jobs as failed if they've been running too long.
118
    """
NEW
119
    query = {"processing_status.status": {"$in": ["submitted", "running"]}}
×
NEW
120
    projection = {"number": 1, "processing_status": 1}
×
NEW
121
    run_docs_running = list(runs_col.find(query, projection))
×
122

NEW
123
    for run_doc in run_docs_running:
×
NEW
124
        processing_status = run_doc["processing_status"]
×
NEW
125
        run_number = run_doc["number"]
×
126

127
        # Mark jobs as failed if they’ve been running or submitted for over 30 minutes
NEW
128
        if processing_status["status"] in ["running", "submitted"]:
×
NEW
129
            if processing_status["time"] < datetime.now() - timedelta(minutes=30):
×
NEW
130
                new_status = "failed"
×
NEW
131
                log.info(
×
132
                    f'Run {run_number} has been {processing_status["status"]} for more than 30 minutes, marking as {new_status}'
133
                )
134

NEW
135
                if production:
×
NEW
136
                    update_processing_status(run_number, new_status, production=production, is_online=True)
×
137
                else:
NEW
138
                    log.info(f"Would have updated run {run_number} to {new_status}")
×
139

140

NEW
141
def submit_new_jobs(args, runs_col, run_docs_to_do, amstrax_dir):
×
142
    """
143
    Submit new jobs if the current number of running/submitted jobs is below the max_jobs limit.
144
    """
NEW
145
    query = {"processing_status.status": {"$in": ["submitted", "running"]}}
×
NEW
146
    projection = {"number": 1, "processing_status": 1}
×
NEW
147
    run_docs_running = list(runs_col.find(query, projection))
×
148

NEW
149
    num_running_jobs = len(run_docs_running)
×
NEW
150
    log.info(f"Found {num_running_jobs} running jobs.")
×
151

NEW
152
    if num_running_jobs >= args.max_jobs:
×
NEW
153
        log.info(f"Too many jobs running ({num_running_jobs}/{args.max_jobs}).")
×
NEW
154
        return
×
155

NEW
156
    max_jobs_to_submit = args.max_jobs - num_running_jobs
×
NEW
157
    will_do_run_ids = [int(run_doc["number"]) for run_doc in run_docs_to_do[:max_jobs_to_submit]]
×
NEW
158
    log.info(f"Submitting jobs for runs: {will_do_run_ids}")
×
159

NEW
160
    for run_doc in run_docs_to_do[:max_jobs_to_submit]:
×
NEW
161
        run_id = f'{int(run_doc["number"]):06}'
×
NEW
162
        job_name = f"process_{run_id}_online"
×
163

NEW
164
        production_flag = "--production" if args.production else ""
×
NEW
165
        targets = " ".join(args.target)
×
166

167

NEW
168
        arguments = []
×
NEW
169
        arguments.append(f"--run_id {run_id}")
×
NEW
170
        arguments.append(f"--targets {targets}")
×
NEW
171
        arguments.append(f"--output_folder {args.output_folder}")
×
NEW
172
        if args.corrections_version:
×
NEW
173
            arguments.append(f"--corrections_version {args.corrections_version}")
×
NEW
174
        if args.amstrax_path:
×
NEW
175
            arguments.append(f"--amstrax_path {args.amstrax_path}")
×
NEW
176
        if args.production:
×
NEW
177
            arguments.append("--production")
×
NEW
178
            arguments.append("--allow_raw_records")
×
NEW
179
            arguments.append("--is_online")
×
NEW
180
        arguments = " ".join(arguments)
×
181

182
        # Now using processing.py instead of make_raw_records.py
NEW
183
        jobstring = f"""
×
184
        echo "Processing run {run_id} at $(date)"
185
        source {SETUP_FILE}
186
        cd {os.path.dirname(os.path.realpath(__file__))}
187
        pwd
188
        python process.py {arguments}
189
        echo "Job complete!"
190
        echo `date`
191
        """
192

NEW
193
        if args.production:
×
NEW
194
            submit_job(
×
195
                jobstring=jobstring,
196
                jobname=job_name,
197
                log_dir=args.logs_path,
198
                queue=args.queue,
199
                mem_per_cpu=args.mem,
200
                cpus_per_task=1,
201
            )
202

NEW
203
            update_processing_status(
×
204
                run_id, "submitted", pull={"tags": {"name": "process"}}, production=args.production, is_online=True
205
            )
206
        else:
NEW
207
            log.info(f"Would have submitted job for run {run_id}")
×
208

209

NEW
210
if __name__ == "__main__":
×
NEW
211
    args = parse_args()
×
212

NEW
213
    log_name = "auto_processing_online"
×
NEW
214
    import amstrax
×
215

NEW
216
    versions = amstrax.print_versions(
×
217
        modules="strax amstrax numpy numba".split(), include_git=False, return_string=True
218
    )
219

NEW
220
    log = amstrax.get_daq_logger(
×
221
        log_name, log_name, level=logging.DEBUG, opening_message=f"Using versions: {versions}", logdir=args.logs_path
222
    )
223

NEW
224
    main(args)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc