• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 7250412654

18 Dec 2023 03:44PM UTC coverage: 74.547% (+0.1%) from 74.452%
7250412654

push

github

web-flow
Merge branch 'dev' into retguard

4565 of 7244 branches covered (0.0%)

350 of 507 new or added lines in 17 files covered. (69.03%)

13 existing lines in 5 files now uncovered.

12843 of 17228 relevant lines covered (74.55%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.87
/pwnlib/adb/adb.py
1
"""Provides utilities for interacting with Android devices via the Android Debug Bridge.
2

3
Using Android Devices with Pwntools
4
-----------------------------------
5

6
Pwntools tries to be as easy as possible to use with Android devices.
7

8
If you have only one device attached, everything "just works".
9

10
If you have multiple devices, you have a handful of options to select one, or iterate
11
over the devices.
12

13
First and most important is the ``context.device`` property, which declares the "currently"
14
selected device in any scope.  It can be set manually to a serial number, or to a ``Device``
15
instance.
16

17
.. code-block:: python
18

19
    # Take the first available device
20
    context.device = adb.wait_for_device()
21

22
    # Set a device by serial number
23
    context.device = 'ZX1G22LH8S'
24

25
    # Set a device by its product name
26
    for device in adb.devices():
27
        if device.product == 'shamu':
28
            break
29
    else:
30
        error("Could not find any shamus!")
31

32
Once a device is selected, you can operate on it with any of the functions in
33
the :mod:`pwnlib.adb` module.
34

35
.. code-block:: python
36

37
    # Get a process listing
38
    print(adb.process(['ps']).recvall())
39

40
    # Fetch properties
41
    print(adb.properties.ro.build.fingerprint)
42

43
    # Read and write files
44
    print(adb.read('/proc/version'))
45
    adb.write('/data/local/tmp/foo', 'my data')
46

47
"""
48
from __future__ import absolute_import
1✔
49
from __future__ import division
1✔
50

51
import functools
1✔
52
import glob
1✔
53
import logging
1✔
54
import os
1✔
55
import re
1✔
56
import shutil
1✔
57
import six
1✔
58
import stat
1✔
59
import tempfile
1✔
60
import time
1✔
61

62
import dateutil.parser
1✔
63

64
from pwnlib import atexit
1✔
65
from pwnlib import tubes
1✔
66
from pwnlib.context import LocalContext
1✔
67
from pwnlib.context import context
1✔
68
from pwnlib.device import Device
1✔
69
from pwnlib.exception import PwnlibException
1✔
70
from pwnlib.log import getLogger
1✔
71
from pwnlib.protocols.adb import AdbClient
1✔
72
from pwnlib.util.packing import _decode
1✔
73
from pwnlib.util import misc
1✔
74

75
log = getLogger(__name__)
1✔
76

77
def adb(argv, *a, **kw):
1✔
78
    r"""Returns the output of an ADB subcommand.
79

80
    .. doctest::
81
       :skipif: skip_android
82

83
        >>> adb.adb('get-serialno')
84
        b'emulator-5554\n'
85
        >>> adb.adb(['shell', 'uname']) # it is better to use adb.process
86
        b'Linux\n'
87
    """
88
    if isinstance(argv, (bytes, six.text_type)):
×
89
        argv = [argv]
×
90

91
    log.debug("$ " + ' '.join(context.adb + argv))
×
92

93
    # All "adb shell" incantations should go through adb.process()
94
    if argv[0] == 'shell':
×
95
        return process(argv[1:], *a, **kw).recvall()
×
96

97
    return tubes.process.process(context.adb + argv, *a, **kw).recvall()
×
98

99
@context.quietfunc
1✔
100
def devices(serial=None):
1✔
101
    """Returns a list of ``Device`` objects corresponding to the connected devices."""
102
    with AdbClient() as c:
×
103
        lines = c.devices(long=True)
×
104
    result = []
×
105

106
    for line in lines.splitlines():
×
107
        # Skip the first 'List of devices attached' line, and the final empty line.
108
        if 'List of devices' in line or not line.strip():
×
109
            continue
×
110
        device = AdbDevice.from_adb_output(line)
×
111
        if device.serial == serial:
×
112
            return device
×
113
        result.append(device)
×
114

115
    return tuple(result)
×
116

117
def current_device(any=False):
1✔
118
    """Returns an ``AdbDevice`` instance for the currently-selected device
119
    (via ``context.device``).
120

121
    .. doctest::
122
       :skipif: skip_android
123

124
        >>> device = adb.current_device(any=True)
125
        >>> device  # doctest: +ELLIPSIS
126
        AdbDevice(serial='emulator-5554', type='device', port='emulator', product='sdk_...phone_...', model='...', device='generic...')
127
        >>> device.port
128
        'emulator'
129
    """
130
    all_devices = devices()
×
131
    for device in all_devices:
×
132
        if any or device == context.device:
×
133
            return device
×
134

135
def with_device(f):
1✔
136
    @functools.wraps(f)
1✔
137
    def wrapper(*a,**kw):
1✔
138
        if not context.device:
×
139
            device = current_device(any=True)
×
140
            if device:
×
141
                log.warn_once('Automatically selecting device %s' % device)
×
142
                context.device = device
×
143
        if not context.device:
×
144
            log.error('No devices connected, cannot invoke %s.%s' % (f.__module__, f.__name__))
×
145
        return f(*a,**kw)
×
146
    return wrapper
1✔
147

148

149
@with_device
1✔
150
def root():
1✔
151
    """Restarts adbd as root.
152

153
    .. doctest::
154
       :skipif: skip_android
155

156
        >>> adb.root()
157
    """
158
    log.info("Enabling root on %s" % context.device)
×
159

160
    with context.quiet:
×
161
        with AdbClient() as c:
×
162
            reply = c.root()
×
163

164
    if 'already running as root' in reply:
×
165
        return
×
166

167
    elif not reply or 'restarting adbd as root' in reply:
×
168
        with context.quiet:
×
169
            wait_for_device()
×
170

171
    else:
172
        log.error("Could not run as root:\n%s" % reply)
×
173

174
def no_emulator(f):
1✔
175
    @functools.wraps(f)
1✔
176
    def wrapper(*a,**kw):
1✔
177
        c = current_device()
×
178
        if c and c.port == 'emulator':
×
179
            log.error("Cannot invoke %s.%s on an emulator." % (f.__module__, f.__name__))
×
180
        return f(*a,**kw)
×
181
    return wrapper
1✔
182

183
@no_emulator
1✔
184
@with_device
1✔
185
def reboot(wait=True):
1✔
186
    """Reboots the device.
187
    """
188
    log.info('Rebooting device %s' % context.device)
×
189

190
    with AdbClient() as c:
×
191
        c.reboot()
×
192

193
    if wait:
×
194
        wait_for_device()
×
195

196
@no_emulator
1✔
197
@with_device
1✔
198
def reboot_bootloader():
1✔
199
    """Reboots the device to the bootloader.
200
    """
201
    log.info('Rebooting %s to bootloader' % context.device)
×
202

203
    with AdbClient() as c:
×
204
        c.reboot_bootloader()
×
205

206
@with_device
1✔
207
def uptime():
1✔
208
    """uptime() -> float
209

210
    Returns:
211
        Uptime of the device, in seconds
212

213
    Example:
214

215
    .. doctest::
216
       :skipif: skip_android
217

218
        >>> adb.uptime() > 3 # normally AVD takes ~7 seconds to boot
219
        True
220
    """
221
    up, idle = map(float, read('/proc/uptime').split())
×
222
    return up
×
223

224
@with_device
1✔
225
def boot_time():
1✔
226
    """boot_time() -> int
227

228
    Returns:
229
        Boot time of the device, in Unix time, rounded to the
230
        nearest second.
231

232
    Example:
233

234
    .. doctest::
235
       :skipif: skip_android
236

237
        >>> import time
238
        >>> adb.boot_time() < time.time()
239
        True
240
    """
241
    for line in read('/proc/stat').splitlines():
×
242
        name, value = line.split(None, 1)
×
243
        if name == b'btime':
×
244
            return int(value)
×
245

246
class AdbDevice(Device):
1✔
247
    """Encapsulates information about a connected device.
248

249
    Example:
250

251
    .. doctest::
252
       :skipif: skip_android
253

254
        >>> device = adb.wait_for_device()
255
        >>> device.arch
256
        'amd64'
257
        >>> device.bits
258
        64
259
        >>> device.os
260
        'android'
261
        >>> device.product  # doctest: +ELLIPSIS
262
        'sdk_...phone_...'
263
        >>> device.serial
264
        'emulator-5554'
265
    """
266
    def __init__(self, serial, type, port=None, product='unknown', model='unknown', device='unknown', features=None, **kw):
1✔
267
        self.serial  = serial
×
268
        self.type    = type
×
269
        self.port    = port
×
270
        self.product = product
×
271
        self.model   = model.replace('_', ' ')
×
272
        self.device  = device
×
273
        self.os      = 'android'
×
274

275
        if product == 'unknown':
×
276
            return
×
277

278
        # Deferred fields
279
        self._initialized = False
×
280
        self._arch = None
×
281
        self._bits = None
×
282
        self._endian = None
×
283
        self._avd = None
×
284

285
    @property
1✔
286
    def arch(self):
1✔
287
        self.__do_deferred_initialization()
×
288
        return self._arch
×
289

290
    @property
1✔
291
    def avd(self):
1✔
292
        self.__do_deferred_initialization()
×
293
        return self._avd
×
294

295
    @property
1✔
296
    def bits(self):
1✔
297
        self.__do_deferred_initialization()
×
298
        return self._bits
×
299

300
    @property
1✔
301
    def endian(self):
1✔
302
        self.__do_deferred_initialization()
×
303
        return self._endian
×
304

305

306
    def __do_deferred_initialization(self):
1✔
307
        if self._initialized:
×
308
            return
×
309

310
        with context.local(device=self.serial):
×
311
            abi = getprop('ro.product.cpu.abi')
×
312
            context.clear()
×
313
            context.arch = str(abi)
×
314
            self._arch = context.arch
×
315
            self._bits = context.bits
×
316
            self._endian = context.endian
×
317

318
        if self.port == 'emulator':
×
319
            emulator, port = self.serial.split('-')
×
320
            port = int(port)
×
321
            try:
×
322
                with remote('localhost', port, level='error') as r:
×
323
                    r.recvuntil('OK')
×
324
                    r.recvline() # Rest of the line
×
325
                    r.sendline('avd name')
×
326
                    self.avd = r.recvline().strip()
×
327
            except:
×
328
                pass
×
329

330
        self._initialized = True
×
331

332
    def __str__(self):
1✔
333
        return self.serial
×
334

335
    def __repr__(self):
1✔
336
        fields = ['serial', 'type', 'port', 'product', 'model', 'device']
×
337
        return '%s(%s)' % (self.__class__.__name__,
×
338
                           ', '.join(('%s=%r' % (field, getattr(self, field)) for field in fields)))
339

340
    @staticmethod
1✔
341
    def from_adb_output(line):
1✔
342
        fields = line.split()
×
343

344
        """
×
345
        Example output:
346
        ZX1G22LM7G             device usb:336789504X product:shamu model:Nexus_6 device:shamu features:cmd,shell_v2
347
        84B5T15A29020449       device usb:336855040X product:angler model:Nexus_6P device:angler
348
        0062741b0e54b353       unauthorized usb:337641472X
349
        emulator-5554          offline
350
        emulator-5554          device product:sdk_phone_armv7 model:sdk_phone_armv7 device:generic
351
        """
352

353
        fields = line.split()
×
354

355
        serial = fields[0]
×
356
        type   = fields[1]
×
357
        kwargs = {}
×
358

359
        if serial.startswith('emulator-'):
×
360
            kwargs['port'] = 'emulator'
×
361

362
        for field in fields[2:]:
×
363
            k,v = field.split(':', 1)
×
364
            kwargs[k] = v
×
365

366
        return AdbDevice(serial, type, **kwargs)
×
367

368
    def __wrapped(self, function):
1✔
369
        """Wrapps a callable in a scope which selects the current device."""
370
        @functools.wraps(function)
×
371
        def wrapper(*a, **kw):
×
372
            with context.local(device=self):
×
373
                return function(*a,**kw)
×
374
        return wrapper
×
375

376
    def __getattr__(self, name):
1✔
377
        """Provides scoped access to ``adb`` module propertise, in the context
378
        of this device.
379

380
        .. doctest::
381
           :skipif: skip_android
382

383
            >>> property = 'ro.build.fingerprint'
384
            >>> device = adb.wait_for_device()
385
            >>> adb.getprop(property) == device.getprop(property)
386
            True
387
        """
388
        if name.startswith('_'):
×
389
            raise AttributeError(name)
×
390

391
        with context.local(device=self):
×
392
            g = globals()
×
393

394
            if name not in g:
×
395
                raise AttributeError('%r object has no attribute %r' % (type(self).__name__,name))
×
396
            value = g[name]
×
397

398
        if not hasattr(value, '__call__'):
×
399
            return value
×
400

401
        return self.__wrapped(value)
×
402

403
@LocalContext
1✔
404
def wait_for_device(kick=False):
1✔
405
    """Waits for a device to be connected.
406

407
    By default, waits for the currently-selected device (via ``context.device``).
408
    To wait for a specific device, set ``context.device``.
409
    To wait for *any* device, clear ``context.device``.
410

411
    Return:
412
        An ``AdbDevice`` instance for the device.
413

414
    Examples:
415

416
    .. doctest::
417
       :skipif: skip_android
418

419
        >>> device = adb.wait_for_device()
420
    """
421
    with log.waitfor("Waiting for device to come online") as w:
×
422
        with AdbClient() as c:
×
423
            if kick:
×
424
                try:
×
425
                    c.reconnect()
×
426
                except Exception:
×
427
                    pass
×
428

429
            serial = ''
×
430
            if context.device:
×
431
                serial = str(context.device)
×
432

433
        with AdbClient() as c:
×
434
            c.wait_for_device(serial)
×
435

436
        for device in devices():
×
437
            if context.device == device:
×
438
                return device
×
439

440
            if not serial:
×
441
                break
×
442
        else:
443
            log.error("Could not find any devices")
×
444

445
        with context.local(device=device):
×
446
            # There may be multiple devices, so context.device is
447
            # insufficient.  Pick the first device reported.
448
            w.success('%s (%s %s %s)' % (device,
×
449
                                         product(),
450
                                         build(),
451
                                         _build_date()))
452

453
            return context.device
×
454

455
@with_device
1✔
456
def disable_verity():
1✔
457
    """Disables dm-verity on the device."""
458
    with log.waitfor("Disabling dm-verity on %s" % context.device):
×
459
        root()
×
460

461
        with AdbClient() as c:
×
462
            reply = c.disable_verity()
×
463

464
        if 'Verity already disabled' in reply:
×
465
            return
×
466
        elif 'Now reboot your device' in reply:
×
467
            reboot(wait=True)
×
468
        elif '0006closed' in reply:
×
469
            return # Emulator doesnt support Verity?
×
470
        else:
471
            log.error("Could not disable verity:\n%s" % reply)
×
472

473
@with_device
1✔
474
def remount():
1✔
475
    """Remounts the filesystem as writable."""
476
    with log.waitfor("Remounting filesystem on %s" % context.device):
×
477
        disable_verity()
×
478
        root()
×
479

480
        with AdbClient() as c:
×
481
            reply = c.remount()
×
482

483
        if 'remount succeeded' not in reply:
×
484
            log.error("Could not remount filesystem:\n%s" % reply)
×
485

486
@with_device
1✔
487
def unroot():
1✔
488
    """Restarts adbd as AID_SHELL."""
489
    log.info("Unrooting %s" % context.device)
×
490
    with context.quiet:
×
491
        with AdbClient() as c:
×
492
            reply  = c.unroot()
×
493

494
    if '0006closed' == reply:
×
495
        return # Emulator doesnt care
×
496

497
    if 'restarting adbd as non root' not in reply:
×
498
        log.error("Could not unroot:\n%s" % reply)
×
499

500
def _create_adb_push_pull_callback(w):
1✔
501
    def callback(filename, data, size, chunk, chunk_size):
×
502
        have = len(data) + len(chunk)
×
503
        if size == 0:
×
504
            size = '???'
×
505
            percent = '???'
×
506
        else:
507
            percent = int(100 * have // size)
×
508
            size = misc.size(size)
×
509
        have = misc.size(have)
×
510
        w.status('%s/%s (%s%%)' % (have, size, percent))
×
511
        return True
×
512
    return callback
×
513

514
@with_device
1✔
515
def pull(remote_path, local_path=None):
1✔
516
    """Download a file from the device.
517

518
    Arguments:
519
        remote_path(str): Path or directory of the file on the device.
520
        local_path(str): Path to save the file to.
521
            Uses the file's name by default.
522

523
    Return:
524
        The contents of the file.
525

526
    Example:
527

528
    .. doctest::
529
       :skipif: skip_android
530

531
        >>> _=adb.pull('/proc/version', './proc-version')
532
        >>> print(read('./proc-version').decode('utf-8')) # doctest: +ELLIPSIS
533
        Linux version ...
534
    """
535
    if local_path is None:
×
536
        local_path = os.path.basename(remote_path)
×
537

538
    msg = "Pulling %r to %r" % (remote_path, local_path)
×
539

540
    if log.isEnabledFor(logging.DEBUG):
×
541
        msg += ' (%s)' % context.device
×
542

543
    with log.waitfor(msg) as w:
×
544
        data = read(remote_path, callback=_create_adb_push_pull_callback(w))
×
545
        misc.write(local_path, data)
×
546

547
    return data
×
548

549
@with_device
1✔
550
def push(local_path, remote_path):
1✔
551
    """Upload a file to the device.
552

553
    Arguments:
554
        local_path(str): Path to the local file to push.
555
        remote_path(str): Path or directory to store the file on the device.
556

557
    Returns:
558
        Remote path of the file.
559

560
    Example:
561

562
    .. doctest::
563
       :skipif: skip_android
564

565
        >>> write('./filename', 'contents')
566
        >>> adb.push('./filename', '/data/local/tmp')
567
        '/data/local/tmp/filename'
568
        >>> adb.read('/data/local/tmp/filename')
569
        b'contents'
570
        >>> adb.push('./filename', '/does/not/exist')
571
        Traceback (most recent call last):
572
        ...
573
        PwnlibException: Could not stat '/does/not/exist'
574
    """
575
    msg = "Pushing %r to %r" % (local_path, remote_path)
×
576
    remote_filename = os.path.basename(local_path)
×
577

578
    if log.isEnabledFor(logging.DEBUG):
×
579
        msg += ' (%s)' % context.device
×
580

581
    with log.waitfor(msg) as w:
×
582
        with AdbClient() as c:
×
583

584
            # We need to discover whether remote_path is a directory or not.
585
            # If we cannot stat the full path, assume it's a path-plus-filename,
586
            # where the filename does not exist.
587
            stat_ = c.stat(remote_path)
×
588
            if not stat_:
×
589
                remote_filename = os.path.basename(remote_path)
×
590
                remote_path = os.path.dirname(remote_path)
×
591
                stat_ = c.stat(remote_path)
×
592

593
            # If we can't find the exact path, or its parent directory, bail!
594
            if not stat_:
×
595
                log.error('Could not stat %r' % remote_path)
×
596

597
            # If we found the parent directory, append the filename
598
            mode = stat_['mode']
×
599
            if stat.S_ISDIR(mode):
×
600
                remote_path = os.path.join(remote_path, remote_filename)
×
601

602
            c.write(remote_path,
×
603
                    misc.read(local_path),
604
                    callback=_create_adb_push_pull_callback(w))
605

606
    return remote_path
×
607

608
@context.quietfunc
1✔
609
@with_device
1✔
610
def read(path, target=None, callback=None):
1✔
611
    """Download a file from the device, and extract its contents.
612

613
    Arguments:
614
        path(str): Path to the file on the device.
615
        target(str): Optional, location to store the file.
616
            Uses a temporary file by default.
617
        callback(callable): See the documentation for
618
            ``adb.protocol.AdbClient.read``.
619

620
    Examples:
621

622
    .. doctest::
623
       :skipif: skip_android
624

625
        >>> print(adb.read('/proc/version').decode('utf-8')) # doctest: +ELLIPSIS
626
        Linux version ...
627
        >>> adb.read('/does/not/exist')
628
        Traceback (most recent call last):
629
        ...
630
        PwnlibException: Could not stat '/does/not/exist'
631
    """
632
    with AdbClient() as c:
×
633
        stat = c.stat(path)
×
634
        if not stat:
×
635
            log.error('Could not stat %r' % path)
×
636
        data = c.read(path, stat['size'], callback=callback)
×
637

638
    if target:
×
639
        misc.write(target, data)
×
640

641
    return data
×
642

643
@context.quietfunc
1✔
644
@with_device
1✔
645
def write(path, data=b''):
1✔
646
    """Create a file on the device with the provided contents.
647

648
    Arguments:
649
        path(str): Path to the file on the device
650
        data(str): Contents to store in the file
651

652
    Examples:
653

654
    .. doctest::
655
       :skipif: skip_android
656

657
        >>> adb.write('/dev/null', b'data')
658
        >>> adb.write('/data/local/tmp/')
659
    """
660
    with tempfile.NamedTemporaryFile() as temp:
×
661
        misc.write(temp.name, data)
×
662
        push(temp.name, path)
×
663

664
@context.quietfunc
1✔
665
@with_device
1✔
666
def mkdir(path):
1✔
667
    """Create a directory on the target device.
668

669
    Note:
670
        Silently succeeds if the directory already exists.
671

672
    Arguments:
673
        path(str): Directory to create.
674

675
    Examples:
676

677
    .. doctest::
678
       :skipif: skip_android
679

680
        >>> adb.mkdir('/')
681

682
        >>> path = '/data/local/tmp/mkdir_test'
683
        >>> adb.exists(path)
684
        False
685
        >>> adb.mkdir(path)
686
        >>> adb.exists(path)
687
        True
688

689
        >>> adb.mkdir('/init')
690
        Traceback (most recent call last):
691
        ...
692
        PwnlibException: mkdir failed for /init, File exists
693
    """
694
    if not path.startswith('/'):
×
695
        log.error("Must provide an absolute path: %r" % path)
×
696

697
    with AdbClient() as c:
×
698
        st = c.stat(path)
×
699

700
        # Don't re-create existing directories
701
        if st and stat.S_ISDIR(st['mode']):
×
702
            return
×
703

704
        result = process(['mkdir', path]).recvall()
×
705

706
        # Any output at all is an error
707
        if result:
×
708
            log.error(result.rstrip().decode('utf-8'))
×
709

710
@context.quietfunc
1✔
711
@with_device
1✔
712
def makedirs(path):
1✔
713
    """Create a directory and all parent directories on the target device.
714

715
    Note:
716
        Silently succeeds if the directory already exists.
717

718
    Examples:
719

720
    .. doctest::
721
       :skipif: skip_android
722

723
        >>> adb.makedirs('/data/local/tmp/this/is/a/directory/hierarchy')
724
        >>> adb.listdir('/data/local/tmp/this/is/a/directory')
725
        ['hierarchy']
726
    """
727
    if path != '/':
×
728
        makedirs(os.path.dirname(path))
×
729

730
    mkdir(path)
×
731

732
@context.quietfunc
1✔
733
@with_device
1✔
734
def exists(path):
1✔
735
    """Return :const:`True` if ``path`` exists on the target device.
736

737
    Examples:
738

739
    .. doctest::
740
       :skipif: skip_android
741

742
        >>> adb.exists('/')
743
        True
744
        >>> adb.exists('/etc/hosts')
745
        True
746
        >>> adb.exists('/does/not/exist')
747
        False
748
    """
749
    with AdbClient() as c:
×
750
        return bool(c.stat(path))
×
751

752
@context.quietfunc
1✔
753
@with_device
1✔
754
def isdir(path):
1✔
755
    """Return :const:`True` if ``path`` is a on the target device.
756

757
    Examples:
758

759
    .. doctest::
760
       :skipif: skip_android
761

762
        >>> adb.isdir('/')
763
        True
764
        >>> adb.isdir('/init')
765
        False
766
        >>> adb.isdir('/does/not/exist')
767
        False
768
    """
769
    with AdbClient() as c:
×
770
        st = c.stat(path)
×
771
        return bool(st and stat.S_ISDIR(st['mode']))
×
772

773
@context.quietfunc
1✔
774
@with_device
1✔
775
def unlink(path, recursive=False):
1✔
776
    """Unlinks a file or directory on the target device.
777

778
    Examples:
779

780
    .. doctest::
781
       :skipif: skip_android
782

783
        >>> adb.unlink("/does/not/exist")
784
        Traceback (most recent call last):
785
        ...
786
        PwnlibException: Could not unlink '/does/not/exist': Does not exist
787

788
        >>> filename = '/data/local/tmp/unlink-test'
789
        >>> adb.write(filename, 'hello')
790
        >>> adb.exists(filename)
791
        True
792
        >>> adb.unlink(filename)
793
        >>> adb.exists(filename)
794
        False
795

796
        >>> adb.mkdir(filename)
797
        >>> adb.write(filename + '/contents', 'hello')
798
        >>> adb.unlink(filename)
799
        Traceback (most recent call last):
800
        ...
801
        PwnlibException: Cannot delete non-empty directory '/data/local/tmp/unlink-test' without recursive=True
802

803
        >>> adb.unlink(filename, recursive=True)
804
        >>> adb.exists(filename)
805
        False
806
    """
807
    with AdbClient() as c:
×
808
        st = c.stat(path)
×
809
        if not st:
×
810
            log.error("Could not unlink %r: Does not exist" % path)
×
811

812
        # If the directory is not empty, do not delete it
813
        if isdir(path) and c.list(path) and not recursive:
×
814
            log.error("Cannot delete non-empty directory %r without recursive=True" % path)
×
815

816
        flags = '-rf' if recursive else '-f'
×
817

818
        output = c.execute(['rm', flags, path]).recvall()
×
819

820
        if output:
×
821
            log.error(output.decode('utf-8'))
×
822

823
@with_device
1✔
824
def process(argv, *a, **kw):
1✔
825
    """Execute a process on the device.
826

827
    See :class:`pwnlib.tubes.process.process` documentation for more info.
828

829
    Returns:
830
        A :class:`pwnlib.tubes.process.process` tube.
831

832
    Examples:
833

834
    .. doctest::
835
       :skipif: skip_android
836

837
        >>> adb.root()
838
        >>> print(adb.process(['cat','/proc/version']).recvall().decode('utf-8')) # doctest: +ELLIPSIS
839
        Linux version ...
840
    """
841
    if isinstance(argv, (bytes, six.text_type)):
×
842
        argv = [argv]
×
843

844
    message = "Starting %s process %r" % ('Android', argv[0])
×
845

846
    if log.isEnabledFor(logging.DEBUG):
×
847
        if argv != [argv[0]]: message += ' argv=%r ' % argv
×
848

849
    with log.progress(message) as p:
×
850
        return AdbClient().execute(argv)
×
851

852
@with_device
1✔
853
def interactive(**kw):
1✔
854
    """Spawns an interactive shell."""
855
    return shell(**kw).interactive()
×
856

857
@with_device
1✔
858
def shell(**kw):
1✔
859
    """Returns an interactive shell."""
860
    return process(['sh', '-i'], **kw)
×
861

862
@with_device
1✔
863
def which(name, all = False, *a, **kw):
1✔
864
    """Retrieves the full path to a binary in ``$PATH`` on the device
865

866
    Arguments:
867
        name(str): Binary name
868
        all(bool): Whether to return all paths, or just the first
869
        *a: Additional arguments for :func:`.adb.process`
870
        **kw: Additional arguments for :func:`.adb.process`
871

872
    Returns:
873
        Either a path, or list of paths
874

875
    Example:
876

877
    .. doctest::
878
       :skipif: skip_android
879

880
        >>> adb.which('sh')
881
        '/system/bin/sh'
882
        >>> adb.which('sh', all=True)
883
        ['/system/bin/sh']
884

885
        >>> adb.which('foobar') is None
886
        True
887
        >>> adb.which('foobar', all=True)
888
        []
889
    """
890
    # Unfortunately, there is no native 'which' on many phones.
891
    which_cmd = '''
×
892
(IFS=:
893
  for directory in $PATH; do
894
      [ -x "$directory/{name}" ] || continue;
895
      echo -n "$directory/{name}\\x00";
896
  done
897
)
898
[ -x "{name}" ] && echo -n "$PWD/{name}\\x00"
899
'''.format(name=name)
900

901
    which_cmd = which_cmd.strip()
×
902
    data = process(['sh','-c', which_cmd], *a, **kw).recvall()
×
903
    data = _decode(data)
×
904
    result = []
×
905

906
    for path in data.split('\x00'):
×
907
        # Skip empty entries
908
        if not path:
×
909
            continue
×
910

911
        # Return the first entry if all=False
912
        if not all:
×
913
            return path
×
914

915
        # Accumulate all entries if all=True
916
        result.append(path)
×
917

918
    if all:
×
919
        return result
×
920

921
    return None
×
922

923

924
@with_device
1✔
925
def whoami():
1✔
926
    """Returns current shell user
927

928
    Example:
929

930
    .. doctest::
931
       :skipif: skip_android
932

933
       >>> adb.whoami()
934
       b'root'
935
    """
936
    return process(['sh','-ic','echo $USER']).recvall().strip()
×
937

938
@with_device
1✔
939
def forward(port):
1✔
940
    """Sets up a port to forward to the device."""
941
    tcp_port = 'tcp:%s' % port
×
942
    adb(['forward', tcp_port, tcp_port])
×
943
    atexit.register(lambda: adb(['forward', '--remove', tcp_port]))
×
944

945
@context.quietfunc
1✔
946
@with_device
1✔
947
def logcat(stream=False):
1✔
948
    """Reads the system log file.
949

950
    By default, causes logcat to exit after reading the file.
951

952
    Arguments:
953
        stream(bool): If :const:`True`, the contents are streamed rather than
954
            read in a one-shot manner.  Default is :const:`False`.
955

956
    Returns:
957
        If ``stream`` is :const:`False`, returns a string containing the log data.
958
        Otherwise, it returns a :class:`pwnlib.tubes.tube.tube` connected to the log output.
959
    """
960

961
    if stream:
×
962
        return process(['logcat'])
×
963
    else:
964
        return process(['logcat', '-d']).recvall()
×
965

966
@with_device
1✔
967
def pidof(name):
1✔
968
    """Returns a list of PIDs for the named process."""
969
    with context.quiet:
×
970
        # Older devices have a broken 'pidof', apparently.
971
        # Try pgrep first.
972
        io = process(['pgrep', name])
×
973
        data = io.recvall()
×
974

975
        if 'not found' in data:
×
976
            io = process(['pidof', name])
×
977
            data = io.recvall()
×
978

979
    return list(map(int, data.split()))
×
980

981
@with_device
1✔
982
def proc_exe(pid):
1✔
983
    """Returns the full path of the executable for the provided PID.
984

985
    Example:
986

987
    .. doctest::
988
       :skipif: skip_android
989

990
        >>> adb.proc_exe(1)
991
        b'/init'
992
    """
993
    with context.quiet:
×
994
        io  = process(['realpath','/proc/%d/exe' % pid])
×
995
        data = io.recvall().strip()
×
996
    return data
×
997

998
@with_device
1✔
999
def getprop(name=None):
1✔
1000
    """Reads a properties from the system property store.
1001

1002
    Arguments:
1003
        name(str): Optional, read a single property.
1004

1005
    Returns:
1006
        If ``name`` is not specified, a ``dict`` of all properties is returned.
1007
        Otherwise, a string is returned with the contents of the named property.
1008

1009
    Example:
1010

1011
    .. doctest::
1012
       :skipif: skip_android
1013

1014
        >>> adb.getprop() # doctest: +ELLIPSIS
1015
        {...}
1016
    """
1017
    with context.quiet:
×
1018
        if name:
×
1019
            result = process(['getprop', name]).recvall().strip()
×
1020
            result = _decode(result)
×
1021
            return result
×
1022

1023
        result = process(['getprop']).recvall()
×
1024

1025
    result = _decode(result)
×
1026
    expr = r'\[([^\]]+)\]: \[(.*)\]'
×
1027

1028
    props = {}
×
1029
    for line in result.splitlines():
×
1030
        if not line.startswith('['):
×
1031
            continue
×
1032

1033
        name, value = re.search(expr, line).groups()
×
1034

1035
        if value.isdigit():
×
1036
            value = int(value)
×
1037

1038
        props[name] = value
×
1039

1040
    return props
×
1041

1042
@with_device
1✔
1043
def setprop(name, value):
1✔
1044
    """Writes a property to the system property store."""
1045
    return process(['setprop', name, value]).recvall().strip()
×
1046

1047
@with_device
1✔
1048
def listdir(directory='/'):
1✔
1049
    """Returns a list containing the entries in the provided directory.
1050

1051
    Note:
1052
        This uses the SYNC LIST functionality, which runs in the adbd
1053
        SELinux context.  If adbd is running in the su domain ('adb root'),
1054
        this behaves as expected.
1055

1056
        Otherwise, less files may be returned due to restrictive SELinux
1057
        policies on adbd.
1058
    """
1059
    return sorted(AdbClient().list(directory))
×
1060

1061
@with_device
1✔
1062
def fastboot(args, *a, **kw):
1✔
1063
    """Executes a fastboot command.
1064

1065
    Returns:
1066
        The command output.
1067
    """
1068
    argv = ['fastboot', '-s', str(context.device)] + list(args)
×
1069
    return tubes.process.process(argv, *a, **kw).recvall()
×
1070

1071
@with_device
1✔
1072
def fingerprint():
1✔
1073
    """Returns the device build fingerprint."""
1074
    return getprop('ro.build.fingerprint')
×
1075

1076
@with_device
1✔
1077
def product():
1✔
1078
    """Returns the device product identifier."""
1079
    return getprop('ro.build.product')
×
1080

1081
@with_device
1✔
1082
def build():
1✔
1083
    """Returns the Build ID of the device."""
1084
    return getprop('ro.build.id')
×
1085

1086
@with_device
1✔
1087
@no_emulator
1✔
1088
def unlock_bootloader():
1✔
1089
    """Unlocks the bootloader of the device.
1090

1091
    Note:
1092
        This requires physical interaction with the device.
1093
    """
1094
    w = log.waitfor("Unlocking bootloader")
×
1095
    with w:
×
1096
        if getprop('ro.oem_unlock_supported') == '0':
×
1097
            log.error("Bootloader cannot be unlocked: ro.oem_unlock_supported=0")
×
1098

1099
        if getprop('ro.boot.oem_unlock_support') == '0':
×
1100
            log.error("Bootloader cannot be unlocked: ro.boot.oem_unlock_support=0")
×
1101

1102
        if getprop('sys.oem_unlock_allowed') == '0':
×
1103
            log.error("Bootloader cannot be unlocked: Enable OEM Unlock in developer settings first", context.device)
×
1104

1105
        AdbClient().reboot_bootloader()
×
1106

1107
        # Check to see if it's unlocked before attempting unlock
1108
        unlocked = fastboot(['getvar', 'unlocked'])
×
1109
        if 'unlocked: yes' in unlocked:
×
1110
            w.success("Already unlocked")
×
1111
            fastboot(['continue'])
×
1112
            return
×
1113

1114
        fastboot(['oem', 'unlock'])
×
1115
        unlocked = fastboot(['getvar', 'unlocked'])
×
1116

1117
        fastboot(['continue'])
×
1118

1119
        if 'unlocked: yes' not in unlocked:
×
1120
            log.error("Unlock failed")
×
1121

1122
class Kernel(object):
1✔
1123
    _kallsyms = None
1✔
1124

1125
    @property
1✔
1126
    def address(self):
1✔
1127
        """Returns kernel address
1128
        Example:
1129

1130
        .. doctest::
1131
           :skipif: skip_android
1132

1133
            >>> hex(adb.kernel.address) # doctest: +ELLIPSIS
1134
            '0x...000'
1135
        """
1136
        return self.symbols['_text']
×
1137

1138
    @property
1✔
1139
    @context.quietfunc
1✔
1140
    def symbols(self):
1✔
1141
        """Returns a dictionary of kernel symbols"""
1142
        result = {}
×
1143
        for line in self.kallsyms.splitlines():
×
1144
            fields = line.split()
×
1145
            address = int(fields[0], 16)
×
1146
            name    = fields[-1]
×
1147
            result[name] = address
×
1148
        return result
×
1149

1150
    @property
1✔
1151
    @context.quietfunc
1✔
1152
    def kallsyms(self):
1✔
1153
        """Returns the raw output of kallsyms"""
1154
        if not self._kallsyms:
×
1155
            self._kallsyms = {}
×
1156
            root()
×
1157
            write('/proc/sys/kernel/kptr_restrict', '1')
×
1158
            self._kallsyms = read('/proc/kallsyms').decode('ascii')
×
1159
        return self._kallsyms
×
1160

1161
    @property
1✔
1162
    @context.quietfunc
1✔
1163
    def version(self):
1✔
1164
        """Returns the kernel version of the device."""
1165
        root()
×
1166
        return read('/proc/version').strip()
×
1167

1168
    @property
1✔
1169
    @context.quietfunc
1✔
1170
    def cmdline(self):
1✔
1171
        root()
×
1172
        return read('/proc/cmdline').strip()
×
1173

1174
    @property
1✔
1175
    @context.quietfunc
1✔
1176
    def lastmsg(self):
1✔
1177
        root()
×
1178
        if 'last_kmsg' in listdir('/proc'):
×
1179
            return read('/proc/last_kmsg')
×
1180

1181
        if 'console-ramoops' in listdir('/sys/fs/pstore/'):
×
1182
            return read('/sys/fs/pstore/console-ramoops')
×
1183

1184
    def enable_uart(self):
1✔
1185
        """Reboots the device with kernel logging to the UART enabled."""
1186
        model = getprop('ro.product.model')
×
1187

1188
        known_commands = {
×
1189
            'Nexus 4': None,
1190
            'Nexus 5': None,
1191
            'Nexus 6': 'oem config console enable',
1192
            'Nexus 5X': None,
1193
            'Nexus 6P': 'oem uart enable',
1194
            'Nexus 7': 'oem uart-on',
1195
        }
1196

1197
        with log.waitfor('Enabling kernel UART'):
×
1198

1199
            if model not in known_commands:
×
1200
                log.error("Device UART is unsupported.")
×
1201

1202
            command = known_commands[model]
×
1203

1204
            if command is None:
×
1205
                w.success('Always enabled')
×
1206
                return
×
1207

1208
            # Check the current commandline, it may already be enabled.
1209
            if any(s.startswith('console=tty') for s in self.cmdline.split()):
×
1210
                w.success("Already enabled")
×
1211
                return
×
1212

1213
            # Need to be root
1214
            with context.local(device=context.device):
×
1215
                # Save off the command line before rebooting to the bootloader
1216
                cmdline = kernel.cmdline
×
1217

1218
                reboot_bootloader()
×
1219

1220
                # Wait for device to come online
1221
                while context.device not in fastboot(['devices',' -l']):
×
1222
                    time.sleep(0.5)
×
1223

1224
                # Try the 'new' way
1225
                fastboot(command.split())
×
1226
                fastboot(['continue'])
×
1227
                wait_for_device()
×
1228

1229

1230
kernel = Kernel()
1✔
1231

1232
class Property(object):
1✔
1233
    def __init__(self, name=None):
1✔
1234
        # Need to avoid overloaded setattr() so we go through __dict__
1235
        self.__dict__['_name'] = name
1✔
1236

1237
    def __str__(self):
1✔
1238
        return str(getprop(self._name)).strip()
×
1239

1240
    def __getattr__(self, attr):
1✔
1241
        if attr.startswith('_'):
1!
1242
            raise AttributeError(attr)
1✔
1243
        if self._name:
×
1244
            attr = '%s.%s' % (self._name, attr)
×
1245
        return Property(attr)
×
1246

1247
    def __setattr__(self, attr, value):
1✔
1248
        if attr in self.__dict__:
×
1249
            return super(Property, self).__setattr__(attr, value)
×
1250

1251
        if self._name:
×
1252
            attr = '%s.%s' % (self._name, attr)
×
1253
        setprop(attr, value)
×
1254

1255
    def __eq__(self, other):
1✔
1256
        """Allow simple comparison
1257

1258
        Example:
1259

1260
        .. doctest::
1261
           :skipif: skip_android
1262

1263
            >>> adb.properties.ro.build.version.sdk == "24"
1264
            True
1265
        """
1266
        if isinstance(other, six.string_types):
×
1267
            return str(self) == other
×
1268
        return super(Property, self).__eq__(other)
×
1269

1270
    def __hash__(self, other):
1✔
1271
        # Allow hash indices matching on the property
1272
        return hash(self._name)
×
1273

1274
properties = Property()
1✔
1275

1276
def _build_date():
1✔
1277
    """Returns the build date in the form YYYY-MM-DD as a string"""
1278
    as_string = getprop('ro.build.date')
×
1279
    as_datetime =  dateutil.parser.parse(as_string)
×
1280
    return as_datetime.strftime('%Y-%b-%d')
×
1281

1282
def find_ndk_project_root(source):
1✔
1283
    '''Given a directory path, find the topmost project root.
1284

1285
    tl;dr "foo/bar/jni/baz.cpp" ==> "foo/bar"
1286
    '''
1287
    ndk_directory = os.path.abspath(source)
×
1288
    while ndk_directory != '/':
×
1289
        if os.path.exists(os.path.join(ndk_directory, 'jni')):
×
1290
            break
×
1291
        ndk_directory = os.path.dirname(ndk_directory)
×
1292
    else:
1293
        return None
×
1294

1295
    return ndk_directory
×
1296

1297
_android_mk_template = '''
1✔
1298
LOCAL_PATH := $(call my-dir)
1299

1300
include $(CLEAR_VARS)
1301
LOCAL_MODULE := %(local_module)s
1302
LOCAL_SRC_FILES := %(local_src_files)s
1303

1304
include $(BUILD_EXECUTABLE)
1305
'''.lstrip()
1306

1307
_application_mk_template = '''
1✔
1308
LOCAL_PATH := $(call my-dir)
1309

1310
include $(CLEAR_VARS)
1311
APP_ABI:= %(app_abi)s
1312
APP_PLATFORM:=%(app_platform)s
1313
'''.lstrip()
1314

1315
def _generate_ndk_project(file_list, abi='arm-v7a', platform_version=21):
1✔
1316
    # Create our project root
1317
    root = tempfile.mkdtemp()
×
1318

1319
    if not isinstance(file_list, (list, tuple)):
×
1320
        file_list = [file_list]
×
1321

1322
    # Copy over the source file(s)
1323
    jni_directory = os.path.join(root, 'jni')
×
1324
    os.mkdir(jni_directory)
×
1325
    for file in file_list:
×
1326
        shutil.copy(file, jni_directory)
×
1327

1328
    # Create the directories
1329

1330
    # Populate Android.mk
1331
    local_module = os.path.basename(file_list[0])
×
1332
    local_module, _ = os.path.splitext(local_module)
×
1333
    local_src_files = ' '.join(list(map(os.path.basename, file_list)))
×
1334
    Android_mk = os.path.join(jni_directory, 'Android.mk')
×
1335
    with open(Android_mk, 'w+') as f:
×
1336
        f.write(_android_mk_template % locals())
×
1337

1338
    # Populate Application.mk
1339
    app_abi = abi
×
1340
    app_platform = 'android-%s' % platform_version
×
1341
    Application_mk = os.path.join(jni_directory, 'Application.mk')
×
1342
    with open(Application_mk, 'w+') as f:
×
1343
        f.write(_application_mk_template % locals())
×
1344

1345
    return root
×
1346

1347
def compile(source):
1✔
1348
    r"""Compile a source file or project with the Android NDK.
1349

1350
    Example:
1351

1352
    .. doctest::
1353
       :skipif: skip_android
1354

1355
        >>> temp = tempfile.mktemp('.c')
1356
        >>> write(temp, '''
1357
        ... #include <stdio.h>
1358
        ... static char buf[4096];
1359
        ... int main() {
1360
        ...   FILE *fp = fopen("/proc/self/maps", "r");
1361
        ...   int n = fread(buf, 1, sizeof(buf), fp);
1362
        ...   fwrite(buf, 1, n, stdout);
1363
        ...   return 0;
1364
        ... }''')
1365
        >>> filename = adb.compile(temp)
1366
        >>> sent = adb.push(filename, "/data/local/tmp")
1367
        >>> adb.process(sent).recvall() # doctest: +ELLIPSIS
1368
        b'... /system/lib64/libc.so\n...'
1369
    """
1370

1371
    ndk_build = misc.which('ndk-build')
×
1372
    if not ndk_build:
×
1373
        # Ensure that we can find the NDK.
1374
        for envvar in ('NDK', 'ANDROID_NDK', 'ANDROID_NDK_ROOT',
×
1375
                       'ANDROID_NDK_HOME', 'ANDROID_NDK_LATEST_HOME'):
1376
            ndk = os.environ.get(envvar)
×
1377
            if ndk is not None:
×
1378
                break
×
1379
        else:
1380
            log.error('$NDK must be set to the Android NDK directory')
×
1381
        ndk_build = os.path.join(ndk, 'ndk-build')
×
1382

1383
    # Determine whether the source is an NDK project or a single source file.
1384
    project = find_ndk_project_root(source)
×
1385

1386
    if not project:
×
1387
        # Realistically this should inherit from context.arch, but
1388
        # this works for now.
1389
        sdk = '21'
×
1390
        abi = {
×
1391
            'aarch64': 'arm64-v8a',
1392
            'amd64':   'x86_64',
1393
            'arm':     'armeabi-v7a',
1394
            'i386':    'x86',
1395
            'mips':    'mips',
1396
            'mips64':  'mips64',
1397
        }.get(context.arch, None)
1398

1399
        # If we have an attached device, use its settings.
1400
        if context.device:
×
1401
            abi = getprop('ro.product.cpu.abi')
×
1402
            sdk = getprop('ro.build.version.sdk')
×
1403

1404
        if abi is None:
×
1405
            log.error("Unknown CPU ABI")
×
1406

1407
        project = _generate_ndk_project(source, abi, sdk)
×
1408

1409
    # Remove any output files
1410
    lib = os.path.join(project, 'libs')
×
1411
    if os.path.exists(lib):
×
1412
        shutil.rmtree(lib)
×
1413

1414
    # Build the project
1415
    io = tubes.process.process(ndk_build, cwd=os.path.join(project, 'jni'))
×
1416

1417
    result = io.recvall()
×
1418

1419
    if 0 != io.poll():
×
1420
        log.error("Build failed:\n%s", result)
×
1421

1422
    # Find all of the output files
1423
    output = glob.glob(os.path.join(lib, '*', '*'))
×
1424

1425
    return output[0]
×
1426

1427
class Partition(object):
1✔
1428
    def __init__(self, path, name, blocks=0):
1✔
1429
        self.path = path
×
1430
        self.name = name
×
1431
        self.blocks = blocks
×
1432
        self.size = blocks * 1024
×
1433

1434
    @property
1✔
1435
    def data(self):
1✔
1436
        with log.waitfor('Fetching %r partition (%s)' % (self.name, self.path)):
×
1437
            return read(self.path)
×
1438

1439
@with_device
1✔
1440
def walk(top, topdown=True):
1✔
1441
    join = os.path.join
×
1442
    isdir = lambda x: stat.S_ISDIR(x['mode'])
×
1443
    client = AdbClient()
×
1444
    names = client.list(top)
×
1445

1446
    dirs, nondirs = [], []
×
1447
    for name, metadata in names.items():
×
1448
        if isdir(metadata):
×
1449
            dirs.append(name)
×
1450
        else:
1451
            nondirs.append(name)
×
1452

1453
    if topdown:
×
1454
        yield top, dirs, nondirs
×
1455
    for name in dirs:
×
1456
        new_path = join(top, name)
×
1457
        for x in walk(new_path, topdown):
×
1458
            yield x
×
1459
    if not topdown:
×
1460
        yield top, dirs, nondirs
×
1461

1462
@with_device
1✔
1463
def find(top, name):
1✔
1464
    for root, dirs, files in walk(top):
×
1465
        if name in files or name in dirs:
×
1466
            yield os.path.join(root, name)
×
1467

1468
@with_device
1✔
1469
def readlink(path):
1✔
1470
    path = process(['realpath', path]).recvall()
×
1471

1472
    # Readlink will emit a single newline
1473
    # We can't use the '-n' flag since old versions don't support it
1474
    if path.endswith(b'\n'):
×
1475
        path = path[:-1]
×
1476

1477
    return path.decode()
×
1478

1479
class Partitions(object):
1✔
1480
    """Enable access to partitions
1481

1482
    Example:
1483

1484
    .. doctest::
1485
       :skipif: skip_android
1486

1487
        >>> hex(adb.partitions.vda.size) # doctest: +ELLIPSIS
1488
        '0x...000'
1489
    """
1490
    @property
1✔
1491
    @context.quietfunc
1✔
1492
    def by_name_dir(self):
1✔
1493
        try:
×
NEW
1494
            with context.local(log_level=logging.FATAL):
×
NEW
1495
                return next(find('/dev/block/platform','by-name'))
×
NEW
1496
        except (StopIteration, PwnlibException):
×
UNCOV
1497
            return '/dev/block'
×
1498

1499
    @context.quietfunc
1✔
1500
    def __dir__(self):
1✔
1501
        return list(self)
×
1502

1503
    @context.quietfunc
1✔
1504
    def iter_proc_partitions(self):
1✔
1505
        for line in read('/proc/partitions').splitlines():
×
1506
            if not line.strip():
×
1507
                continue
×
1508
            major, minor, blocks, name = line.split(None, 4)
×
1509
            yield blocks, name.decode()
×
1510

1511
    @context.quietfunc
1✔
1512
    @with_device
1✔
1513
    def __iter__(self):
1✔
1514
        root()
×
1515

1516
        # Find all named partitions
1517
        names = set(listdir(self.by_name_dir))
×
1518

1519
        # Find all unnamed partitions
1520
        for _, name in self.iter_proc_partitions():
×
1521
            names.add(name)
×
1522
        return iter(names)
×
1523

1524
    def __getattr__(self, attr):
1✔
1525
        if attr.startswith('_'):
1!
1526
            raise AttributeError(attr)
1✔
1527

1528
        for name in self:
×
1529
            if name == attr:
×
1530
                break
×
1531
        else:
1532
            raise AttributeError("No partition %r" % attr)
×
1533

1534
        path = os.path.join(self.by_name_dir, name)
×
1535

1536
        with context.quiet:
×
1537
            # Find the actual path of the device
1538
            devpath = readlink(path)
×
1539
            devname = os.path.basename(devpath)
×
1540

1541
            # Get the size of the partition
1542
            for blocks, name in self.iter_proc_partitions():
×
1543
                if name in (devname, attr):
×
1544
                    break
×
1545
            else:
1546
                log.error("Could not find size of partition %r" % name)
×
1547

1548
        return Partition(devpath, attr, int(blocks))
×
1549

1550
partitions = Partitions()
1✔
1551

1552
def install(apk, *arguments):
1✔
1553
    """Install an APK onto the device.
1554

1555
    This is a wrapper around 'pm install', which backs 'adb install'.
1556

1557
    Arguments:
1558
        apk(str): Path to the APK to intall (e.g. ``'foo.apk'``)
1559
        arguments: Supplementary arguments to 'pm install',
1560
            e.g. ``'-l', '-g'``.
1561
    """
1562
    if not apk.endswith('.apk'):
×
1563
        log.error("APK must have .apk extension")
×
1564

1565
    basename = os.path.basename(apk)
×
1566
    target_path = '/data/local/tmp/{}.apk'.format(basename)
×
1567

1568
    with log.progress("Installing APK {}".format(basename)) as p:
×
1569
        with context.quiet:
×
1570
            p.status('Copying APK to device')
×
1571
            push(apk, target_path)
×
1572

1573
            p.status('Installing')
×
1574
            result = process(['pm', 'install-create', target_path] + list(arguments)).recvall()
×
1575

1576
            status = result.splitlines()[-1]
×
1577
            if 'Success' not in status:
×
1578
                log.error(status)
×
1579

1580
def uninstall(package, *arguments):
1✔
1581
    """Uninstall an APK from the device.
1582

1583
    This is a wrapper around 'pm uninstall', which backs 'adb uninstall'.
1584

1585
    Arguments:
1586
        package(str): Name of the package to uninstall (e.g. ``'com.foo.MyPackage'``)
1587
        arguments: Supplementary arguments to ``'pm install'``, e.g. ``'-k'``.
1588
    """
1589
    with log.progress("Uninstalling package {}".format(package)):
×
1590
        with context.quiet:
×
1591
            return process(['pm','uninstall',package] + list(arguments)).recvall()
×
1592

1593
@context.quietfunc
1✔
1594
def packages():
1✔
1595
    """Returns a list of packages installed on the system"""
1596
    # Decodes the received bytes as UTF-8 per:
1597
    # https://developer.android.com/reference/java/nio/charset/Charset#defaultCharset()
1598
    # where it is specified that UTF-8 is the default charset for Android.
1599
    packages = process(['pm', 'list', 'packages']).recvall().decode('utf-8')
×
1600
    return [line.split('package:', 1)[-1] for line in packages.splitlines()]
×
1601

1602
@context.quietfunc
1✔
1603
def version():
1✔
1604
    """Returns rthe platform version as a tuple."""
1605
    prop = getprop('ro.build.version.release')
×
1606
    return [int(v) for v in prop.split('.')]
×
1607

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc