• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dimagi / commcare-export / 20761213144

06 Jan 2026 08:31PM UTC coverage: 81.891%. First build
20761213144

Pull #257

github

web-flow
Merge 37fe94d61 into b5d79a40f
Pull Request #257: Logging

297 of 351 branches covered (84.62%)

15 of 21 new or added lines in 1 file covered. (71.43%)

3898 of 4760 relevant lines covered (81.89%)

4.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.37
/commcare_export/cli.py
1
import argparse
5✔
2
import getpass
5✔
3
import io
5✔
4
import json
5✔
5
import os
5✔
6
import sys
5✔
7
import logging
5✔
8
import dateutil.parser
5✔
9
import requests
5✔
10
import sqlalchemy
5✔
11

12
from commcare_export import builtin_queries, excel_query, writers
5✔
13
from commcare_export.checkpoint import CheckpointManagerProvider
5✔
14
from commcare_export.commcare_hq_client import (
5✔
15
    LATEST_KNOWN_VERSION,
16
    CommCareHqClient,
17
    ResourceRepeatException,
18
)
19
from commcare_export.commcare_minilinq import CommCareHqEnv
5✔
20
from commcare_export.env import BuiltInEnv, EmitterEnv, JsonPathEnv
5✔
21
from commcare_export.exceptions import (
5✔
22
    DataExportException,
23
    MissingQueryFileException,
24
)
25
from commcare_export.location_info_provider import LocationInfoProvider
5✔
26
from commcare_export.minilinq import List, MiniLinq
5✔
27
from commcare_export.misc import default_to_json
5✔
28
from commcare_export.repeatable_iterator import RepeatableIterator
5✔
29
from commcare_export.utils import get_checkpoint_manager
5✔
30
from commcare_export.version import __version__
5✔
31
from commcare_export import get_logger, get_error_logger
5✔
32

33
EXIT_STATUS_SUCCESS = 0
5✔
34
EXIT_STATUS_ERROR = 1
5✔
35
logger = get_logger(__file__)
5✔
36

37
commcare_hq_aliases = {
5✔
38
    'local': 'http://localhost:8000',
39
    'prod': 'https://www.commcarehq.org'
40
}
41

42

43
class Argument:
5✔
44

45
    def __init__(self, name, *args, **kwargs):
5✔
46
        self.name = name.replace('-', '_')
5✔
47
        self._args = [f'--{name}'] + list(args)
5✔
48
        self._kwargs = kwargs
5✔
49

50
    @property
5✔
51
    def default(self):
5✔
52
        return self._kwargs.get('default')
5✔
53

54
    def add_to_parser(self, parser, **additional_kwargs):
5✔
55
        additional_kwargs.update(self._kwargs)
5✔
56
        parser.add_argument(*self._args, **additional_kwargs)
5✔
57

58

59
CLI_ARGS = [
5✔
60
    Argument(
61
        'version',
62
        default=False,
63
        action='store_true',
64
        help='Print the current version of the commcare-export tool.'
65
    ),
66
    Argument('query', required=False, help='JSON or Excel query file'),
67
    Argument('dump-query', default=False, action='store_true'),
68
    Argument(
69
        'commcare-hq',
70
        default='prod',
71
        help='Base url for the CommCare HQ instance e.g. '
72
        'https://www.commcarehq.org'
73
    ),
74
    Argument('api-version', default=LATEST_KNOWN_VERSION),
75
    Argument('project'),
76
    Argument('username'),
77
    Argument(
78
        'password',
79
        help='Enter password, or if using apikey auth-mode, enter the api key.'
80
    ),
81
    Argument(
82
        'auth-mode',
83
        default='password',
84
        choices=['password', 'apikey'],
85
        help='Use "digest" auth, or "apikey" auth (for two factor enabled '
86
        'domains).'
87
    ),
88
    Argument(
89
        'since',
90
        help='Export all data after this date. Format YYYY-MM-DD or '
91
        'YYYY-MM-DDTHH:mm:SS'
92
    ),
93
    Argument(
94
        'until',
95
        help='Export all data up until this date. Format YYYY-MM-DD or '
96
        'YYYY-MM-DDTHH:mm:SS'
97
    ),
98
    Argument(
99
        'start-over',
100
        default=False,
101
        action='store_true',
102
        help='When saving to a SQL database; the default is to pick up '
103
        'since the last success. This disables that.'
104
    ),
105
    Argument('verbose', default=False, action='store_true'),
106
    Argument(
107
        'output-format',
108
        default='json',
109
        choices=['json', 'csv', 'xls', 'xlsx', 'sql', 'markdown'],
110
        help='Output format'
111
    ),
112
    Argument(
113
        'output',
114
        metavar='PATH',
115
        default='reports.zip',
116
        help='Path to output; defaults to `reports.zip`.'
117
    ),
118
    Argument(
119
        'strict-types',
120
        default=False,
121
        action='store_true',
122
        help="When saving to a SQL database don't allow changing column types "
123
        "once they are created."
124
    ),
125
    Argument(
126
        'missing-value',
127
        default=None,
128
        help="Value to use when a field is missing from the form / case."
129
    ),
130
    Argument(
131
        'batch-size',
132
        default=200,
133
        help="Number of records to process per batch."
134
    ),
135
    Argument(
136
        'checkpoint-key',
137
        help="Use this key for all checkpoints instead of the query file MD5 "
138
        "hash in order to prevent table rebuilds after a query file has "
139
        "been edited."
140
    ),
141
    Argument(
142
        'users',
143
        default=False,
144
        action='store_true',
145
        help="Export a table containing data about this project's mobile "
146
        "workers"
147
    ),
148
    Argument(
149
        'locations',
150
        default=False,
151
        action='store_true',
152
        help="Export a table containing data about this project's locations"
153
    ),
154
    Argument(
155
        'with-organization',
156
        default=False,
157
        action='store_true',
158
        help="Export tables containing mobile worker data and location data "
159
        "and add a commcare_userid field to any exported form or case"
160
    ),
161
    Argument(
162
        'export-root-if-no-subdocument',
163
        default=False,
164
        action='store_true',
165
        help="Use this when you are exporting a nested document e.g. "
166
        "form.form..case, messaging-event.messages.[*] And you want to "
167
        "have a record exported even if the nested document does not "
168
        "exist or is empty.",
169
    ),
170
    Argument(
171
        'no-logfile',
172
        default=False,
173
        help="Specify in order to prevent information being logged to the log file and"
174
             " show all output in the console.",
175
        action='store_true',
176
    ),
177
    Argument(
178
        'log-dir',
179
        default=None,
180
        help="Directory where the log file (commcare_export.log) will be written. "
181
             "Defaults to the current working directory. Log entries are appended "
182
             "to preserve history across runs."
183
    ),
184
]
185

186

187
def set_up_logging(log_dir=None):
5✔
188
    """
189
    Set up file-based logging.
190

191
    :param log_dir: Directory where the log file will be written. If
192
        None, uses the current working directory.
193
    :returns tuple: (success, log_file_path, error_msg)
194
    """
195
    if log_dir is None:
5✔
196
        log_dir = os.getcwd()
5✔
197

198
    log_file = os.path.join(log_dir, "commcare_export.log")
5✔
199

200
    try:
5✔
201
        os.makedirs(log_dir, exist_ok=True)  # Create if it doesn't exist
5✔
202

203
        with open(log_file, 'a'):  # Test write permissions
5✔
204
            pass
5✔
205

206
        logging.basicConfig(
5✔
207
            filename=log_file,
208
            format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
209
            filemode='a',
210
        )
211
        sys.stderr = get_error_logger()
5✔
212
        return True, log_file, None
5✔
213
    except (OSError, IOError, PermissionError) as err:
5✔
214
        return False, log_file, str(err)
5✔
215

216

217
def main(argv):
5✔
218
    parser = argparse.ArgumentParser(
5✔
219
        'commcare-export', 'Output a customized export of CommCareHQ data.'
220
    )
221
    for arg in CLI_ARGS:
5✔
222
        arg.add_to_parser(parser)
5✔
223

224
    args = parser.parse_args(argv)
5✔
225

226
    if args.output_format and args.output:
5✔
227
        errors = []
5✔
228
        errors.extend(validate_output_filename(args.output_format, args.output))
5✔
229
        if errors:
5✔
230
            raise Exception(f"Could not proceed. Following issues were found: {', '.join(errors)}.")
×
231

232
    if not args.no_logfile:
5✔
NEW
233
        success, log_file, error = set_up_logging(args.log_dir)
×
NEW
234
        if success:
×
NEW
235
            print(f'Writing logs to {log_file}')
×
236
        else:
NEW
237
            print(f'Warning: Unable to write to log file {log_file}: {error}')
×
NEW
238
            print('Logging to console only.')
×
239

240
    if args.verbose:
5✔
241
        logging.basicConfig(
×
242
            level=logging.DEBUG,
243
            format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
244
        )
245
    else:
246
        logging.basicConfig(
5✔
247
            level=logging.WARN,
248
            format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
249
        )
250

251
    logging.getLogger('alembic').setLevel(logging.WARN)
5✔
252
    logging.getLogger('backoff').setLevel(logging.FATAL)
5✔
253
    logging.getLogger('urllib3').setLevel(logging.WARN)
5✔
254

255
    if args.version:
5✔
256
        print(f'commcare-export version {__version__}')
×
257
        sys.exit(0)
×
258

259
    if not args.project:
5✔
260
        error_msg = "commcare-export: error: argument --project is required"
×
261
        # output to log file through sys.stderr
262
        print(
×
263
            error_msg,
264
            file=sys.stderr
265
        )
266
        # Output to console for debugging
267
        print(error_msg)
×
268
        sys.exit(1)
×
269

270
    print("Running export...")
5✔
271
    try:
5✔
272
        exit_code = main_with_args(args)
5✔
273
        if exit_code > 0:
5✔
274
            print("Error occurred! See log file for error.")
×
275
        sys.exit(exit_code)
5✔
276
    except Exception:
×
277
        print("Error occurred! See log file for error.")
×
278
        raise
×
279
    finally:
280
        print("Export finished!")
5✔
281

282

283
def validate_output_filename(output_format, output_filename):
5✔
284
    """
285
    Validate file extensions for csv, xls and xlsx output formats.
286
    Ensure extension unless using sql output_format.
287
    """
288
    errors = []
5✔
289
    if output_format == 'csv' and not output_filename.endswith('.zip'):
5✔
290
        errors.append("For output format as csv, output file name should have extension zip")
5✔
291
    elif output_format == 'xls' and not output_filename.endswith('.xls'):
5✔
292
        errors.append("For output format as xls, output file name should have extension xls")
5✔
293
    elif output_format == 'xlsx' and not output_filename.endswith('.xlsx'):
5✔
294
        errors.append("For output format as xlsx, output file name should have extension xlsx")
5✔
295
    elif output_format != 'sql' and "." not in output_filename:
5✔
296
        errors.append("Missing extension in output file name")
5✔
297
    return errors
5✔
298

299

300
def _get_query(args, writer, column_enforcer=None):
5✔
301
    return _get_query_from_file(
5✔
302
        args.query,
303
        args.missing_value,
304
        writer.supports_multi_table_write,
305
        writer.max_column_length,
306
        writer.required_columns,
307
        column_enforcer,
308
        args.export_root_if_no_subdocument
309
    )
310

311

312
def _get_query_from_file(
5✔
313
    query_arg,
314
    missing_value,
315
    combine_emits,
316
    max_column_length,
317
    required_columns,
318
    column_enforcer,
319
    value_or_root
320
):
321
    if os.path.exists(query_arg):
5✔
322
        if os.path.splitext(query_arg)[1] in ['.xls', '.xlsx']:
5✔
323
            import openpyxl
5✔
324
            workbook = openpyxl.load_workbook(query_arg)
5✔
325
            return excel_query.get_queries_from_excel(
5✔
326
                workbook,
327
                missing_value,
328
                combine_emits,
329
                max_column_length,
330
                required_columns,
331
                column_enforcer,
332
                value_or_root
333
            )
334
        else:
335
            with io.open(query_arg, encoding='utf-8') as fh:
×
336
                return MiniLinq.from_jvalue(json.loads(fh.read()))
×
337

338

339
def get_queries(args, writer, lp, column_enforcer=None):
5✔
340
    query_list = []
5✔
341
    if args.query is not None:
5✔
342
        query = _get_query(args, writer, column_enforcer=column_enforcer)
5✔
343

344
        if not query:
5✔
345
            raise MissingQueryFileException(args.query)
×
346
        query_list.append(query)
5✔
347

348
    if args.users or args.with_organization:
5✔
349
        # Add user data to query
350
        query_list.append(builtin_queries.users_query)
5✔
351

352
    if args.locations or args.with_organization:
5✔
353
        # Add location data to query
354
        query_list.append(builtin_queries.get_locations_query(lp))
5✔
355

356
    return List(query_list) if len(query_list) > 1 else query_list[0]
5✔
357

358

359
def _get_writer(output_format, output, strict_types):
5✔
360
    if output_format == 'xlsx':
5✔
361
        return writers.Excel2007TableWriter(output)
×
362
    elif output_format == 'xls':
5✔
363
        return writers.Excel2003TableWriter(output)
×
364
    elif output_format == 'csv':
5✔
365
        if not output.endswith(".zip"):
×
366
            print(
×
367
                "WARNING: csv output is a zip file, but "
368
                f"will be written to {output}"
369
            )
370
            print(
×
371
                "Consider appending .zip to the file name to avoid confusion."
372
            )
373
        return writers.CsvTableWriter(output)
×
374
    elif output_format == 'json':
5✔
375
        return writers.JValueTableWriter()
5✔
376
    elif output_format == 'markdown':
×
377
        return writers.StreamingMarkdownTableWriter(sys.stdout)
×
378
    elif output_format == 'sql':
×
379
        # Output should be a connection URL. Writer had bizarre issues
380
        # so we use a full connection instead of passing in a URL or
381
        # engine.
382
        if output.startswith('mysql'):
×
383
            charset_split = output.split('charset=')
×
384
            if len(charset_split) > 1 and charset_split[1] != 'utf8mb4':
×
385
                raise Exception(
×
386
                    f"The charset '{charset_split[1]}' might cause problems with the export. "
387
                    f"It is recommended that you use 'utf8mb4' instead."
388
                )
389

390
        return writers.SqlTableWriter(output, strict_types)
×
391
    else:
392
        raise Exception(f"Unknown output format: {output_format}")
×
393

394

395
def get_date_params(args):
5✔
396
    since = dateutil.parser.parse(args.since) if args.since else None
5✔
397
    until = dateutil.parser.parse(args.until) if args.until else None
5✔
398
    return since, until
5✔
399

400

401
def _get_api_client(args, commcarehq_base_url):
5✔
402
    return CommCareHqClient(
5✔
403
        url=commcarehq_base_url,
404
        project=args.project,
405
        username=args.username,
406
        password=args.password,
407
        auth_mode=args.auth_mode,
408
        version=args.api_version
409
    )
410

411

412
def _get_checkpoint_manager(args):
5✔
413
    if not args.users and not args.locations and not os.path.exists(
×
414
        args.query
415
    ):
416
        logger.warning(
×
417
            "Checkpointing disabled for non builtin, "
418
            "non file-based query"
419
        )
420
    elif args.since or args.until:
×
421
        logger.warning(
×
422
            "Checkpointing disabled when using '--since' or '--until'"
423
        )
424
    else:
425
        checkpoint_manager = get_checkpoint_manager(args)
×
426
        checkpoint_manager.create_checkpoint_table()
×
427
        return checkpoint_manager
×
428

429

430
def force_lazy_result(lazy_result):
5✔
431
    if lazy_result is not None:
5✔
432
        if isinstance(lazy_result, RepeatableIterator):
5✔
433
            list(lazy_result) if lazy_result else lazy_result
5✔
434
        else:
435
            for nested_result in lazy_result:
5✔
436
                force_lazy_result(nested_result)
5✔
437

438

439
def evaluate_query(env, query):
5✔
440
    with env:
5✔
441
        try:
5✔
442
            lazy_result = query.eval(env)
5✔
443
            force_lazy_result(lazy_result)
5✔
444
            return 0
5✔
445
        except requests.exceptions.RequestException as e:
5✔
446
            if e.response and e.response.status_code == 401:
×
447
                print(
×
448
                    "\nAuthentication failed. Please check your credentials.",
449
                    file=sys.stderr
450
                )
451
                return EXIT_STATUS_ERROR
×
452
            else:
453
                raise
×
454
        except ResourceRepeatException as e:
5✔
455
            print('Stopping because the export is stuck')
×
456
            print(e.message)
×
457
            print('Try increasing --batch-size to overcome the error')
×
458
            return EXIT_STATUS_ERROR
×
459
        except (
5✔
460
            sqlalchemy.exc.DataError,
461
            sqlalchemy.exc.InternalError,
462
            sqlalchemy.exc.ProgrammingError
463
        ) as e:
464
            print('Stopping because of database error:\n', e)
5✔
465
            return EXIT_STATUS_ERROR
5✔
466
        except KeyboardInterrupt:
×
467
            print('\nExport aborted', file=sys.stderr)
×
468
            return EXIT_STATUS_ERROR
×
469

470

471
def main_with_args(args):
5✔
472
    logger.info(f"CommCare Export Version {__version__}")
5✔
473
    writer = _get_writer(args.output_format, args.output, args.strict_types)
5✔
474

475
    if args.query is None and args.users is False and args.locations is False:
5✔
476
        print(
×
477
            'At least one the following arguments is required: '
478
            '--query, --users, --locations',
479
            file=sys.stderr
480
        )
481
        return EXIT_STATUS_ERROR
×
482

483
    if not args.username:
5✔
484
        logger.warn("Username not provided")
×
485
        args.username = input('Please provide a username: ')
×
486

487
    if not args.password:
5✔
488
        logger.warn("Password not provided")
×
489
        # Windows getpass does not accept unicode
490
        args.password = getpass.getpass()
×
491

492
    column_enforcer = None
5✔
493
    if args.with_organization:
5✔
494
        column_enforcer = builtin_queries.ColumnEnforcer()
×
495

496
    commcarehq_base_url = commcare_hq_aliases.get(
5✔
497
        args.commcare_hq, args.commcare_hq
498
    )
499
    api_client = _get_api_client(args, commcarehq_base_url)
5✔
500
    lp = LocationInfoProvider(api_client, page_size=args.batch_size)
5✔
501
    try:
5✔
502
        query = get_queries(args, writer, lp, column_enforcer)
5✔
503
    except DataExportException as e:
×
504
        print(e.message, file=sys.stderr)
×
505
        return EXIT_STATUS_ERROR
×
506

507
    if args.dump_query:
5✔
508
        print(json.dumps(query.to_jvalue(), indent=4))
×
NEW
509
        return EXIT_STATUS_SUCCESS
×
510

511
    checkpoint_manager = None
5✔
512
    if writer.support_checkpoints:
5✔
513
        checkpoint_manager = _get_checkpoint_manager(args)
5✔
514

515
    since, until = get_date_params(args)
5✔
516
    if args.start_over:
5✔
517
        if checkpoint_manager:
5✔
518
            logger.warning(
5✔
519
                'Ignoring all checkpoints and re-fetching all data from '
520
                'CommCare.'
521
            )
522
    elif since:
5✔
523
        logger.debug('Starting from %s', args.since)
5✔
524

525
    cm = CheckpointManagerProvider(checkpoint_manager, since, args.start_over)
5✔
526
    static_env = {
5✔
527
        'commcarehq_base_url': commcarehq_base_url,
528
        'get_checkpoint_manager': cm.get_checkpoint_manager,
529
        'get_location_info': lp.get_location_info,
530
        'get_location_ancestor': lp.get_location_ancestor
531
    }
532
    env = (
5✔
533
        BuiltInEnv(static_env)
534
        | CommCareHqEnv(api_client, until=until, page_size=args.batch_size)
535
        | JsonPathEnv({})
536
        | EmitterEnv(writer)
537
    )
538

539
    exit_status = evaluate_query(env, query)
5✔
540

541
    if args.output_format == 'json':
5✔
542
        print(
5✔
543
            json.dumps(
544
                list(writer.tables.values()),
545
                indent=4,
546
                default=default_to_json
547
            )
548
        )
549

550
    return exit_status
5✔
551

552

553
def entry_point():
5✔
554
    main(sys.argv[1:])
×
555

556

557
if __name__ == '__main__':
5✔
558
    entry_point()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc