• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Gallopsled / pwntools / 8912ca5a8c3a9725c3ba6d30561607150a6faebe-PR-2205

pending completion
8912ca5a8c3a9725c3ba6d30561607150a6faebe-PR-2205

Pull #2205

github-actions

web-flow
Merge 81f463e2c into 8b4cacf8b
Pull Request #2205: Fix stable Python 2 installation from a built wheel

3878 of 6371 branches covered (60.87%)

12199 of 16604 relevant lines covered (73.47%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.07
/pwnlib/adb/adb.py
1
"""Provides utilities for interacting with Android devices via the Android Debug Bridge.
2

3
Using Android Devices with Pwntools
4
-----------------------------------
5

6
Pwntools tries to be as easy as possible to use with Android devices.
7

8
If you have only one device attached, everything "just works".
9

10
If you have multiple devices, you have a handful of options to select one, or iterate
11
over the devices.
12

13
First and most important is the ``context.device`` property, which declares the "currently"
14
selected device in any scope.  It can be set manually to a serial number, or to a ``Device``
15
instance.
16

17
.. code-block:: python
18

19
    # Take the first available device
20
    context.device = adb.wait_for_device()
21

22
    # Set a device by serial number
23
    context.device = 'ZX1G22LH8S'
24

25
    # Set a device by its product name
26
    for device in adb.devices():
27
        if device.product == 'shamu':
28
            break
29
    else:
30
        error("Could not find any shamus!")
31

32
Once a device is selected, you can operate on it with any of the functions in
33
the :mod:`pwnlib.adb` module.
34

35
.. code-block:: python
36

37
    # Get a process listing
38
    print(adb.process(['ps']).recvall())
39

40
    # Fetch properties
41
    print(adb.properties.ro.build.fingerprint)
42

43
    # Read and write files
44
    print(adb.read('/proc/version'))
45
    adb.write('/data/local/tmp/foo', 'my data')
46

47
"""
48
from __future__ import absolute_import
1✔
49
from __future__ import division
1✔
50

51
import functools
1✔
52
import glob
1✔
53
import logging
1✔
54
import os
1✔
55
import re
1✔
56
import shutil
1✔
57
import six
1✔
58
import stat
1✔
59
import tempfile
1✔
60
import time
1✔
61

62
import dateutil.parser
1✔
63

64
from pwnlib import atexit
1✔
65
from pwnlib import tubes
1✔
66
from pwnlib.context import LocalContext
1✔
67
from pwnlib.context import context
1✔
68
from pwnlib.device import Device
1✔
69
from pwnlib.log import getLogger
1✔
70
from pwnlib.protocols.adb import AdbClient
1✔
71
from pwnlib.util.packing import _decode
1✔
72
from pwnlib.util import misc
1✔
73

74
log = getLogger(__name__)
1✔
75

76
def adb(argv, *a, **kw):
1✔
77
    r"""Returns the output of an ADB subcommand.
78

79
    .. doctest::
80
       :skipif: skip_android
81

82
        >>> adb.adb('get-serialno')
83
        b'emulator-5554\n'
84
        >>> adb.adb(['shell', 'uname']) # it is better to use adb.process
85
        b'Linux\n'
86
    """
87
    if isinstance(argv, (bytes, six.text_type)):
×
88
        argv = [argv]
×
89

90
    log.debug("$ " + ' '.join(context.adb + argv))
×
91

92
    # All "adb shell" incantations should go through adb.process()
93
    if argv[0] == 'shell':
×
94
        return process(argv[1:], *a, **kw).recvall()
×
95

96
    return tubes.process.process(context.adb + argv, *a, **kw).recvall()
×
97

98
@context.quietfunc
1✔
99
def devices(serial=None):
1✔
100
    """Returns a list of ``Device`` objects corresponding to the connected devices."""
101
    with AdbClient() as c:
×
102
        lines = c.devices(long=True)
×
103
    result = []
×
104

105
    for line in lines.splitlines():
×
106
        # Skip the first 'List of devices attached' line, and the final empty line.
107
        if 'List of devices' in line or not line.strip():
×
108
            continue
×
109
        device = AdbDevice.from_adb_output(line)
×
110
        if device.serial == serial:
×
111
            return device
×
112
        result.append(device)
×
113

114
    return tuple(result)
×
115

116
def current_device(any=False):
1✔
117
    """Returns an ``AdbDevice`` instance for the currently-selected device
118
    (via ``context.device``).
119

120
    .. doctest::
121
       :skipif: skip_android
122

123
        >>> device = adb.current_device(any=True)
124
        >>> device  # doctest: +ELLIPSIS
125
        AdbDevice(serial='emulator-5554', type='device', port='emulator', product='sdk_...phone_armv7', model='sdk ...phone armv7', device='generic')
126
        >>> device.port
127
        'emulator'
128
    """
129
    all_devices = devices()
×
130
    for device in all_devices:
×
131
        if any or device == context.device:
×
132
            return device
×
133

134
def with_device(f):
1✔
135
    @functools.wraps(f)
1✔
136
    def wrapper(*a,**kw):
137
        if not context.device:
×
138
            device = current_device(any=True)
×
139
            if device:
×
140
                log.warn_once('Automatically selecting device %s' % device)
×
141
                context.device = device
×
142
        if not context.device:
×
143
            log.error('No devices connected, cannot invoke %s.%s' % (f.__module__, f.__name__))
×
144
        return f(*a,**kw)
×
145
    return wrapper
1✔
146

147

148
@with_device
1✔
149
def root():
150
    """Restarts adbd as root.
151

152
    .. doctest::
153
       :skipif: skip_android
154

155
        >>> adb.root()
156
    """
157
    log.info("Enabling root on %s" % context.device)
×
158

159
    with context.quiet:
×
160
        with AdbClient() as c:
×
161
            reply = c.root()
×
162

163
    if 'already running as root' in reply:
×
164
        return
×
165

166
    elif not reply or 'restarting adbd as root' in reply:
×
167
        with context.quiet:
×
168
            wait_for_device()
×
169

170
    else:
171
        log.error("Could not run as root:\n%s" % reply)
×
172

173
def no_emulator(f):
1✔
174
    @functools.wraps(f)
1✔
175
    def wrapper(*a,**kw):
176
        c = current_device()
×
177
        if c and c.port == 'emulator':
×
178
            log.error("Cannot invoke %s.%s on an emulator." % (f.__module__, f.__name__))
×
179
        return f(*a,**kw)
×
180
    return wrapper
1✔
181

182
@no_emulator
1✔
183
@with_device
1✔
184
def reboot(wait=True):
1✔
185
    """Reboots the device.
186
    """
187
    log.info('Rebooting device %s' % context.device)
×
188

189
    with AdbClient() as c:
×
190
        c.reboot()
×
191

192
    if wait:
×
193
        wait_for_device()
×
194

195
@no_emulator
1✔
196
@with_device
1✔
197
def reboot_bootloader():
198
    """Reboots the device to the bootloader.
199
    """
200
    log.info('Rebooting %s to bootloader' % context.device)
×
201

202
    with AdbClient() as c:
×
203
        c.reboot_bootloader()
×
204

205
@with_device
1✔
206
def uptime():
207
    """uptime() -> float
208

209
    Returns:
210
        Uptime of the device, in seconds
211

212
    Example:
213

214
    .. doctest::
215
       :skipif: skip_android
216

217
        >>> adb.uptime() > 3 # normally AVD takes ~7 seconds to boot
218
        True
219
    """
220
    up, idle = map(float, read('/proc/uptime').split())
×
221
    return up
×
222

223
@with_device
1✔
224
def boot_time():
225
    """boot_time() -> int
226

227
    Returns:
228
        Boot time of the device, in Unix time, rounded to the
229
        nearest second.
230

231
    Example:
232

233
    .. doctest::
234
       :skipif: skip_android
235

236
        >>> import time
237
        >>> adb.boot_time() < time.time()
238
        True
239
    """
240
    for line in read('/proc/stat').splitlines():
×
241
        name, value = line.split(None, 1)
×
242
        if name == b'btime':
×
243
            return int(value)
×
244

245
class AdbDevice(Device):
1✔
246
    """Encapsulates information about a connected device.
247

248
    Example:
249

250
    .. doctest::
251
       :skipif: skip_android
252

253
        >>> device = adb.wait_for_device()
254
        >>> device.arch
255
        'arm'
256
        >>> device.bits
257
        32
258
        >>> device.os
259
        'android'
260
        >>> device.product  # doctest: +ELLIPSIS
261
        'sdk_...phone_armv7'
262
        >>> device.serial
263
        'emulator-5554'
264
    """
265
    def __init__(self, serial, type, port=None, product='unknown', model='unknown', device='unknown', features=None, **kw):
1✔
266
        self.serial  = serial
×
267
        self.type    = type
×
268
        self.port    = port
×
269
        self.product = product
×
270
        self.model   = model.replace('_', ' ')
×
271
        self.device  = device
×
272
        self.os      = 'android'
×
273

274
        if product == 'unknown':
×
275
            return
×
276

277
        # Deferred fields
278
        self._initialized = False
×
279
        self._arch = None
×
280
        self._bits = None
×
281
        self._endian = None
×
282
        self._avd = None
×
283

284
    @property
1✔
285
    def arch(self):
286
        self.__do_deferred_initialization()
×
287
        return self._arch
×
288

289
    @property
1✔
290
    def avd(self):
291
        self.__do_deferred_initialization()
×
292
        return self._avd
×
293

294
    @property
1✔
295
    def bits(self):
296
        self.__do_deferred_initialization()
×
297
        return self._bits
×
298

299
    @property
1✔
300
    def endian(self):
301
        self.__do_deferred_initialization()
×
302
        return self._endian
×
303

304

305
    def __do_deferred_initialization(self):
1✔
306
        if self._initialized:
×
307
            return
×
308

309
        with context.local(device=self.serial):
×
310
            abi = getprop('ro.product.cpu.abi')
×
311
            context.clear()
×
312
            context.arch = str(abi)
×
313
            self._arch = context.arch
×
314
            self._bits = context.bits
×
315
            self._endian = context.endian
×
316

317
        if self.port == 'emulator':
×
318
            emulator, port = self.serial.split('-')
×
319
            port = int(port)
×
320
            try:
×
321
                with remote('localhost', port, level='error') as r:
×
322
                    r.recvuntil('OK')
×
323
                    r.recvline() # Rest of the line
×
324
                    r.sendline('avd name')
×
325
                    self.avd = r.recvline().strip()
×
326
            except:
×
327
                pass
×
328

329
        self._initialized = True
×
330

331
    def __str__(self):
1✔
332
        return self.serial
×
333

334
    def __repr__(self):
1✔
335
        fields = ['serial', 'type', 'port', 'product', 'model', 'device']
×
336
        return '%s(%s)' % (self.__class__.__name__,
×
337
                           ', '.join(('%s=%r' % (field, getattr(self, field)) for field in fields)))
338

339
    @staticmethod
1✔
340
    def from_adb_output(line):
341
        fields = line.split()
×
342

343
        """
344
        Example output:
345
        ZX1G22LM7G             device usb:336789504X product:shamu model:Nexus_6 device:shamu features:cmd,shell_v2
346
        84B5T15A29020449       device usb:336855040X product:angler model:Nexus_6P device:angler
347
        0062741b0e54b353       unauthorized usb:337641472X
348
        emulator-5554          offline
349
        emulator-5554          device product:sdk_phone_armv7 model:sdk_phone_armv7 device:generic
350
        """
351

352
        fields = line.split()
×
353

354
        serial = fields[0]
×
355
        type   = fields[1]
×
356
        kwargs = {}
×
357

358
        if serial.startswith('emulator-'):
×
359
            kwargs['port'] = 'emulator'
×
360

361
        for field in fields[2:]:
×
362
            k,v = field.split(':', 1)
×
363
            kwargs[k] = v
×
364

365
        return AdbDevice(serial, type, **kwargs)
×
366

367
    def __wrapped(self, function):
1✔
368
        """Wrapps a callable in a scope which selects the current device."""
369
        @functools.wraps(function)
×
370
        def wrapper(*a, **kw):
371
            with context.local(device=self):
×
372
                return function(*a,**kw)
×
373
        return wrapper
×
374

375
    def __getattr__(self, name):
1✔
376
        """Provides scoped access to ``adb`` module propertise, in the context
377
        of this device.
378

379
        .. doctest::
380
           :skipif: skip_android
381

382
            >>> property = 'ro.build.fingerprint'
383
            >>> device = adb.wait_for_device()
384
            >>> adb.getprop(property) == device.getprop(property)
385
            True
386
        """
387
        if name.startswith('_'):
×
388
            raise AttributeError(name)
×
389

390
        with context.local(device=self):
×
391
            g = globals()
×
392

393
            if name not in g:
×
394
                raise AttributeError('%r object has no attribute %r' % (type(self).__name__,name))
×
395
            value = g[name]
×
396

397
        if not hasattr(value, '__call__'):
×
398
            return value
×
399

400
        return self.__wrapped(value)
×
401

402
@LocalContext
1✔
403
def wait_for_device(kick=False):
1✔
404
    """Waits for a device to be connected.
405

406
    By default, waits for the currently-selected device (via ``context.device``).
407
    To wait for a specific device, set ``context.device``.
408
    To wait for *any* device, clear ``context.device``.
409

410
    Return:
411
        An ``AdbDevice`` instance for the device.
412

413
    Examples:
414

415
    .. doctest::
416
       :skipif: skip_android
417

418
        >>> device = adb.wait_for_device()
419
    """
420
    with log.waitfor("Waiting for device to come online") as w:
×
421
        with AdbClient() as c:
×
422
            if kick:
×
423
                try:
×
424
                    c.reconnect()
×
425
                except Exception:
×
426
                    pass
×
427

428
            serial = ''
×
429
            if context.device:
×
430
                serial = str(context.device)
×
431

432
        with AdbClient() as c:
×
433
            c.wait_for_device(serial)
×
434

435
        for device in devices():
×
436
            if context.device == device:
×
437
                return device
×
438

439
            if not serial:
×
440
                break
×
441
        else:
442
            log.error("Could not find any devices")
×
443

444
        with context.local(device=device):
×
445
            # There may be multiple devices, so context.device is
446
            # insufficient.  Pick the first device reported.
447
            w.success('%s (%s %s %s)' % (device,
×
448
                                         product(),
449
                                         build(),
450
                                         _build_date()))
451

452
            return context.device
×
453

454
@with_device
1✔
455
def disable_verity():
456
    """Disables dm-verity on the device."""
457
    with log.waitfor("Disabling dm-verity on %s" % context.device):
×
458
        root()
×
459

460
        with AdbClient() as c:
×
461
            reply = c.disable_verity()
×
462

463
        if 'Verity already disabled' in reply:
×
464
            return
×
465
        elif 'Now reboot your device' in reply:
×
466
            reboot(wait=True)
×
467
        elif '0006closed' in reply:
×
468
            return # Emulator doesnt support Verity?
×
469
        else:
470
            log.error("Could not disable verity:\n%s" % reply)
×
471

472
@with_device
1✔
473
def remount():
474
    """Remounts the filesystem as writable."""
475
    with log.waitfor("Remounting filesystem on %s" % context.device):
×
476
        disable_verity()
×
477
        root()
×
478

479
        with AdbClient() as c:
×
480
            reply = c.remount()
×
481

482
        if 'remount succeeded' not in reply:
×
483
            log.error("Could not remount filesystem:\n%s" % reply)
×
484

485
@with_device
1✔
486
def unroot():
487
    """Restarts adbd as AID_SHELL."""
488
    log.info("Unrooting %s" % context.device)
×
489
    with context.quiet:
×
490
        with AdbClient() as c:
×
491
            reply  = c.unroot()
×
492

493
    if '0006closed' == reply:
×
494
        return # Emulator doesnt care
×
495

496
    if 'restarting adbd as non root' not in reply:
×
497
        log.error("Could not unroot:\n%s" % reply)
×
498

499
def _create_adb_push_pull_callback(w):
1✔
500
    def callback(filename, data, size, chunk, chunk_size):
×
501
        have = len(data) + len(chunk)
×
502
        if size == 0:
×
503
            size = '???'
×
504
            percent = '???'
×
505
        else:
506
            percent = int(100 * have // size)
×
507
            size = misc.size(size)
×
508
        have = misc.size(have)
×
509
        w.status('%s/%s (%s%%)' % (have, size, percent))
×
510
        return True
×
511
    return callback
×
512

513
@with_device
1✔
514
def pull(remote_path, local_path=None):
1✔
515
    """Download a file from the device.
516

517
    Arguments:
518
        remote_path(str): Path or directory of the file on the device.
519
        local_path(str): Path to save the file to.
520
            Uses the file's name by default.
521

522
    Return:
523
        The contents of the file.
524

525
    Example:
526

527
    .. doctest::
528
       :skipif: skip_android
529

530
        >>> _=adb.pull('/proc/version', './proc-version')
531
        >>> print(read('./proc-version').decode('utf-8')) # doctest: +ELLIPSIS
532
        Linux version ...
533
    """
534
    if local_path is None:
×
535
        local_path = os.path.basename(remote_path)
×
536

537
    msg = "Pulling %r to %r" % (remote_path, local_path)
×
538

539
    if log.isEnabledFor(logging.DEBUG):
×
540
        msg += ' (%s)' % context.device
×
541

542
    with log.waitfor(msg) as w:
×
543
        data = read(remote_path, callback=_create_adb_push_pull_callback(w))
×
544
        misc.write(local_path, data)
×
545

546
    return data
×
547

548
@with_device
1✔
549
def push(local_path, remote_path):
550
    """Upload a file to the device.
551

552
    Arguments:
553
        local_path(str): Path to the local file to push.
554
        remote_path(str): Path or directory to store the file on the device.
555

556
    Returns:
557
        Remote path of the file.
558

559
    Example:
560

561
    .. doctest::
562
       :skipif: skip_android
563

564
        >>> write('./filename', 'contents')
565
        >>> adb.push('./filename', '/data/local/tmp')
566
        '/data/local/tmp/filename'
567
        >>> adb.read('/data/local/tmp/filename')
568
        b'contents'
569
        >>> adb.push('./filename', '/does/not/exist')
570
        Traceback (most recent call last):
571
        ...
572
        PwnlibException: Could not stat '/does/not/exist'
573
    """
574
    msg = "Pushing %r to %r" % (local_path, remote_path)
×
575
    remote_filename = os.path.basename(local_path)
×
576

577
    if log.isEnabledFor(logging.DEBUG):
×
578
        msg += ' (%s)' % context.device
×
579

580
    with log.waitfor(msg) as w:
×
581
        with AdbClient() as c:
×
582

583
            # We need to discover whether remote_path is a directory or not.
584
            # If we cannot stat the full path, assume it's a path-plus-filename,
585
            # where the filename does not exist.
586
            stat_ = c.stat(remote_path)
×
587
            if not stat_:
×
588
                remote_filename = os.path.basename(remote_path)
×
589
                remote_path = os.path.dirname(remote_path)
×
590
                stat_ = c.stat(remote_path)
×
591

592
            # If we can't find the exact path, or its parent directory, bail!
593
            if not stat_:
×
594
                log.error('Could not stat %r' % remote_path)
×
595

596
            # If we found the parent directory, append the filename
597
            mode = stat_['mode']
×
598
            if stat.S_ISDIR(mode):
×
599
                remote_path = os.path.join(remote_path, remote_filename)
×
600

601
            c.write(remote_path,
×
602
                    misc.read(local_path),
603
                    callback=_create_adb_push_pull_callback(w))
604

605
    return remote_path
×
606

607
@context.quietfunc
1✔
608
@with_device
1✔
609
def read(path, target=None, callback=None):
1✔
610
    """Download a file from the device, and extract its contents.
611

612
    Arguments:
613
        path(str): Path to the file on the device.
614
        target(str): Optional, location to store the file.
615
            Uses a temporary file by default.
616
        callback(callable): See the documentation for
617
            ``adb.protocol.AdbClient.read``.
618

619
    Examples:
620

621
    .. doctest::
622
       :skipif: skip_android
623

624
        >>> print(adb.read('/proc/version').decode('utf-8')) # doctest: +ELLIPSIS
625
        Linux version ...
626
        >>> adb.read('/does/not/exist')
627
        Traceback (most recent call last):
628
        ...
629
        PwnlibException: Could not stat '/does/not/exist'
630
    """
631
    with AdbClient() as c:
×
632
        stat = c.stat(path)
×
633
        if not stat:
×
634
            log.error('Could not stat %r' % path)
×
635
        data = c.read(path, stat['size'], callback=callback)
×
636

637
    if target:
×
638
        misc.write(target, data)
×
639

640
    return data
×
641

642
@context.quietfunc
1✔
643
@with_device
1✔
644
def write(path, data=b''):
1✔
645
    """Create a file on the device with the provided contents.
646

647
    Arguments:
648
        path(str): Path to the file on the device
649
        data(str): Contents to store in the file
650

651
    Examples:
652

653
    .. doctest::
654
       :skipif: skip_android
655

656
        >>> adb.write('/dev/null', b'data')
657
        >>> adb.write('/data/local/tmp/')
658
    """
659
    with tempfile.NamedTemporaryFile() as temp:
×
660
        misc.write(temp.name, data)
×
661
        push(temp.name, path)
×
662

663
@context.quietfunc
1✔
664
@with_device
1✔
665
def mkdir(path):
666
    """Create a directory on the target device.
667

668
    Note:
669
        Silently succeeds if the directory already exists.
670

671
    Arguments:
672
        path(str): Directory to create.
673

674
    Examples:
675

676
    .. doctest::
677
       :skipif: skip_android
678

679
        >>> adb.mkdir('/')
680

681
        >>> path = '/data/local/tmp/mkdir_test'
682
        >>> adb.exists(path)
683
        False
684
        >>> adb.mkdir(path)
685
        >>> adb.exists(path)
686
        True
687

688
        >>> adb.mkdir('/init')
689
        Traceback (most recent call last):
690
        ...
691
        PwnlibException: mkdir failed for /init, File exists
692
    """
693
    if not path.startswith('/'):
×
694
        log.error("Must provide an absolute path: %r" % path)
×
695

696
    with AdbClient() as c:
×
697
        st = c.stat(path)
×
698

699
        # Don't re-create existing directories
700
        if st and stat.S_ISDIR(st['mode']):
×
701
            return
×
702

703
        result = process(['mkdir', path]).recvall()
×
704

705
        # Any output at all is an error
706
        if result:
×
707
            log.error(result.rstrip().decode('utf-8'))
×
708

709
@context.quietfunc
1✔
710
@with_device
1✔
711
def makedirs(path):
712
    """Create a directory and all parent directories on the target device.
713

714
    Note:
715
        Silently succeeds if the directory already exists.
716

717
    Examples:
718

719
    .. doctest::
720
       :skipif: skip_android
721

722
        >>> adb.makedirs('/data/local/tmp/this/is/a/directory/hierarchy')
723
        >>> adb.listdir('/data/local/tmp/this/is/a/directory')
724
        ['hierarchy']
725
    """
726
    if path != '/':
×
727
        makedirs(os.path.dirname(path))
×
728

729
    mkdir(path)
×
730

731
@context.quietfunc
1✔
732
@with_device
1✔
733
def exists(path):
734
    """Return :const:`True` if ``path`` exists on the target device.
735

736
    Examples:
737

738
    .. doctest::
739
       :skipif: skip_android
740

741
        >>> adb.exists('/')
742
        True
743
        >>> adb.exists('/etc/hosts')
744
        True
745
        >>> adb.exists('/does/not/exist')
746
        False
747
    """
748
    with AdbClient() as c:
×
749
        return bool(c.stat(path))
×
750

751
@context.quietfunc
1✔
752
@with_device
1✔
753
def isdir(path):
754
    """Return :const:`True` if ``path`` is a on the target device.
755

756
    Examples:
757

758
    .. doctest::
759
       :skipif: skip_android
760

761
        >>> adb.isdir('/')
762
        True
763
        >>> adb.isdir('/init')
764
        False
765
        >>> adb.isdir('/does/not/exist')
766
        False
767
    """
768
    with AdbClient() as c:
×
769
        st = c.stat(path)
×
770
        return bool(st and stat.S_ISDIR(st['mode']))
×
771

772
@context.quietfunc
1✔
773
@with_device
1✔
774
def unlink(path, recursive=False):
1✔
775
    """Unlinks a file or directory on the target device.
776

777
    Examples:
778

779
    .. doctest::
780
       :skipif: skip_android
781

782
        >>> adb.unlink("/does/not/exist")
783
        Traceback (most recent call last):
784
        ...
785
        PwnlibException: Could not unlink '/does/not/exist': Does not exist
786

787
        >>> filename = '/data/local/tmp/unlink-test'
788
        >>> adb.write(filename, 'hello')
789
        >>> adb.exists(filename)
790
        True
791
        >>> adb.unlink(filename)
792
        >>> adb.exists(filename)
793
        False
794

795
        >>> adb.mkdir(filename)
796
        >>> adb.write(filename + '/contents', 'hello')
797
        >>> adb.unlink(filename)
798
        Traceback (most recent call last):
799
        ...
800
        PwnlibException: Cannot delete non-empty directory '/data/local/tmp/unlink-test' without recursive=True
801

802
        >>> adb.unlink(filename, recursive=True)
803
        >>> adb.exists(filename)
804
        False
805
    """
806
    with AdbClient() as c:
×
807
        st = c.stat(path)
×
808
        if not st:
×
809
            log.error("Could not unlink %r: Does not exist" % path)
×
810

811
        # If the directory is not empty, do not delete it
812
        if isdir(path) and c.list(path) and not recursive:
×
813
            log.error("Cannot delete non-empty directory %r without recursive=True" % path)
×
814

815
        flags = '-rf' if recursive else '-f'
×
816

817
        output = c.execute(['rm', flags, path]).recvall()
×
818

819
        if output:
×
820
            log.error(output.decode('utf-8'))
×
821

822
@with_device
1✔
823
def process(argv, *a, **kw):
824
    """Execute a process on the device.
825

826
    See :class:`pwnlib.tubes.process.process` documentation for more info.
827

828
    Returns:
829
        A :class:`pwnlib.tubes.process.process` tube.
830

831
    Examples:
832

833
    .. doctest::
834
       :skipif: skip_android
835

836
        >>> adb.root()
837
        >>> print(adb.process(['cat','/proc/version']).recvall().decode('utf-8')) # doctest: +ELLIPSIS
838
        Linux version ...
839
    """
840
    if isinstance(argv, (bytes, six.text_type)):
×
841
        argv = [argv]
×
842

843
    message = "Starting %s process %r" % ('Android', argv[0])
×
844

845
    if log.isEnabledFor(logging.DEBUG):
×
846
        if argv != [argv[0]]: message += ' argv=%r ' % argv
×
847

848
    with log.progress(message) as p:
×
849
        return AdbClient().execute(argv)
×
850

851
@with_device
1✔
852
def interactive(**kw):
853
    """Spawns an interactive shell."""
854
    return shell(**kw).interactive()
×
855

856
@with_device
1✔
857
def shell(**kw):
858
    """Returns an interactive shell."""
859
    return process(['sh', '-i'], **kw)
×
860

861
@with_device
1✔
862
def which(name, all = False, *a, **kw):
1✔
863
    """Retrieves the full path to a binary in ``$PATH`` on the device
864

865
    Arguments:
866
        name(str): Binary name
867
        all(bool): Whether to return all paths, or just the first
868
        *a: Additional arguments for :func:`.adb.process`
869
        **kw: Additional arguments for :func:`.adb.process`
870

871
    Returns:
872
        Either a path, or list of paths
873

874
    Example:
875

876
    .. doctest::
877
       :skipif: skip_android
878

879
        >>> adb.which('sh')
880
        '/system/bin/sh'
881
        >>> adb.which('sh', all=True)
882
        ['/system/bin/sh']
883

884
        >>> adb.which('foobar') is None
885
        True
886
        >>> adb.which('foobar', all=True)
887
        []
888
    """
889
    # Unfortunately, there is no native 'which' on many phones.
890
    which_cmd = '''
×
891
(IFS=:
892
  for directory in $PATH; do
893
      [ -x "$directory/{name}" ] || continue;
894
      echo -n "$directory/{name}\\x00";
895
  done
896
)
897
[ -x "{name}" ] && echo -n "$PWD/{name}\\x00"
898
'''.format(name=name)
899

900
    which_cmd = which_cmd.strip()
×
901
    data = process(['sh','-c', which_cmd], *a, **kw).recvall()
×
902
    data = _decode(data)
×
903
    result = []
×
904

905
    for path in data.split('\x00'):
×
906
        # Skip empty entries
907
        if not path:
×
908
            continue
×
909

910
        # Return the first entry if all=False
911
        if not all:
×
912
            return path
×
913

914
        # Accumulate all entries if all=True
915
        result.append(path)
×
916

917
    if all:
×
918
        return result
×
919

920
    return None
×
921

922

923
@with_device
1✔
924
def whoami():
925
    """Returns current shell user
926

927
    Example:
928

929
    .. doctest::
930
       :skipif: skip_android
931

932
       >>> adb.whoami()
933
       b'root'
934
    """
935
    return process(['sh','-ic','echo $USER']).recvall().strip()
×
936

937
@with_device
1✔
938
def forward(port):
939
    """Sets up a port to forward to the device."""
940
    tcp_port = 'tcp:%s' % port
×
941
    adb(['forward', tcp_port, tcp_port])
×
942
    atexit.register(lambda: adb(['forward', '--remove', tcp_port]))
×
943

944
@context.quietfunc
1✔
945
@with_device
1✔
946
def logcat(stream=False):
1✔
947
    """Reads the system log file.
948

949
    By default, causes logcat to exit after reading the file.
950

951
    Arguments:
952
        stream(bool): If :const:`True`, the contents are streamed rather than
953
            read in a one-shot manner.  Default is :const:`False`.
954

955
    Returns:
956
        If ``stream`` is :const:`False`, returns a string containing the log data.
957
        Otherwise, it returns a :class:`pwnlib.tubes.tube.tube` connected to the log output.
958
    """
959

960
    if stream:
×
961
        return process(['logcat'])
×
962
    else:
963
        return process(['logcat', '-d']).recvall()
×
964

965
@with_device
1✔
966
def pidof(name):
967
    """Returns a list of PIDs for the named process."""
968
    with context.quiet:
×
969
        # Older devices have a broken 'pidof', apparently.
970
        # Try pgrep first.
971
        io = process(['pgrep', name])
×
972
        data = io.recvall()
×
973

974
        if 'not found' in data:
×
975
            io = process(['pidof', name])
×
976
            data = io.recvall()
×
977

978
    return list(map(int, data.split()))
×
979

980
@with_device
1✔
981
def proc_exe(pid):
982
    """Returns the full path of the executable for the provided PID.
983

984
    Example:
985

986
    .. doctest::
987
       :skipif: skip_android
988

989
        >>> adb.proc_exe(1)
990
        b'/init'
991
    """
992
    with context.quiet:
×
993
        io  = process(['realpath','/proc/%d/exe' % pid])
×
994
        data = io.recvall().strip()
×
995
    return data
×
996

997
@with_device
1✔
998
def getprop(name=None):
1✔
999
    """Reads a properties from the system property store.
1000

1001
    Arguments:
1002
        name(str): Optional, read a single property.
1003

1004
    Returns:
1005
        If ``name`` is not specified, a ``dict`` of all properties is returned.
1006
        Otherwise, a string is returned with the contents of the named property.
1007

1008
    Example:
1009

1010
    .. doctest::
1011
       :skipif: skip_android
1012

1013
        >>> adb.getprop() # doctest: +ELLIPSIS
1014
        {...}
1015
    """
1016
    with context.quiet:
×
1017
        if name:
×
1018
            result = process(['getprop', name]).recvall().strip()
×
1019
            result = _decode(result)
×
1020
            return result
×
1021

1022
        result = process(['getprop']).recvall()
×
1023

1024
    result = _decode(result)
×
1025
    expr = r'\[([^\]]+)\]: \[(.*)\]'
×
1026

1027
    props = {}
×
1028
    for line in result.splitlines():
×
1029
        if not line.startswith('['):
×
1030
            continue
×
1031

1032
        name, value = re.search(expr, line).groups()
×
1033

1034
        if value.isdigit():
×
1035
            value = int(value)
×
1036

1037
        props[name] = value
×
1038

1039
    return props
×
1040

1041
@with_device
1✔
1042
def setprop(name, value):
1043
    """Writes a property to the system property store."""
1044
    return process(['setprop', name, value]).recvall().strip()
×
1045

1046
@with_device
1✔
1047
def listdir(directory='/'):
1✔
1048
    """Returns a list containing the entries in the provided directory.
1049

1050
    Note:
1051
        This uses the SYNC LIST functionality, which runs in the adbd
1052
        SELinux context.  If adbd is running in the su domain ('adb root'),
1053
        this behaves as expected.
1054

1055
        Otherwise, less files may be returned due to restrictive SELinux
1056
        policies on adbd.
1057
    """
1058
    return sorted(AdbClient().list(directory))
×
1059

1060
@with_device
1✔
1061
def fastboot(args, *a, **kw):
1062
    """Executes a fastboot command.
1063

1064
    Returns:
1065
        The command output.
1066
    """
1067
    argv = ['fastboot', '-s', str(context.device)] + list(args)
×
1068
    return tubes.process.process(argv, *a, **kw).recvall()
×
1069

1070
@with_device
1✔
1071
def fingerprint():
1072
    """Returns the device build fingerprint."""
1073
    return getprop('ro.build.fingerprint')
×
1074

1075
@with_device
1✔
1076
def product():
1077
    """Returns the device product identifier."""
1078
    return getprop('ro.build.product')
×
1079

1080
@with_device
1✔
1081
def build():
1082
    """Returns the Build ID of the device."""
1083
    return getprop('ro.build.id')
×
1084

1085
@with_device
1✔
1086
@no_emulator
1✔
1087
def unlock_bootloader():
1088
    """Unlocks the bootloader of the device.
1089

1090
    Note:
1091
        This requires physical interaction with the device.
1092
    """
1093
    w = log.waitfor("Unlocking bootloader")
×
1094
    with w:
×
1095
        if getprop('ro.oem_unlock_supported') == '0':
×
1096
            log.error("Bootloader cannot be unlocked: ro.oem_unlock_supported=0")
×
1097

1098
        if getprop('ro.boot.oem_unlock_support') == '0':
×
1099
            log.error("Bootloader cannot be unlocked: ro.boot.oem_unlock_support=0")
×
1100

1101
        if getprop('sys.oem_unlock_allowed') == '0':
×
1102
            log.error("Bootloader cannot be unlocked: Enable OEM Unlock in developer settings first", context.device)
×
1103

1104
        AdbClient().reboot_bootloader()
×
1105

1106
        # Check to see if it's unlocked before attempting unlock
1107
        unlocked = fastboot(['getvar', 'unlocked'])
×
1108
        if 'unlocked: yes' in unlocked:
×
1109
            w.success("Already unlocked")
×
1110
            fastboot(['continue'])
×
1111
            return
×
1112

1113
        fastboot(['oem', 'unlock'])
×
1114
        unlocked = fastboot(['getvar', 'unlocked'])
×
1115

1116
        fastboot(['continue'])
×
1117

1118
        if 'unlocked: yes' not in unlocked:
×
1119
            log.error("Unlock failed")
×
1120

1121
class Kernel(object):
1✔
1122
    _kallsyms = None
1✔
1123

1124
    @property
1✔
1125
    def address(self):
1126
        """Returns kernel address
1127
        Example:
1128

1129
        .. doctest::
1130
           :skipif: skip_android
1131

1132
            >>> hex(adb.kernel.address) # doctest: +ELLIPSIS
1133
            '0x...000'
1134
        """
1135
        return self.symbols['_text']
×
1136

1137
    @property
1✔
1138
    @context.quietfunc
1✔
1139
    def symbols(self):
1140
        """Returns a dictionary of kernel symbols"""
1141
        result = {}
×
1142
        for line in self.kallsyms.splitlines():
×
1143
            fields = line.split()
×
1144
            address = int(fields[0], 16)
×
1145
            name    = fields[-1]
×
1146
            result[name] = address
×
1147
        return result
×
1148

1149
    @property
1✔
1150
    @context.quietfunc
1✔
1151
    def kallsyms(self):
1152
        """Returns the raw output of kallsyms"""
1153
        if not self._kallsyms:
×
1154
            self._kallsyms = {}
×
1155
            root()
×
1156
            write('/proc/sys/kernel/kptr_restrict', '1')
×
1157
            self._kallsyms = read('/proc/kallsyms').decode('ascii')
×
1158
        return self._kallsyms
×
1159

1160
    @property
1✔
1161
    @context.quietfunc
1✔
1162
    def version(self):
1163
        """Returns the kernel version of the device."""
1164
        root()
×
1165
        return read('/proc/version').strip()
×
1166

1167
    @property
1✔
1168
    @context.quietfunc
1✔
1169
    def cmdline(self):
1170
        root()
×
1171
        return read('/proc/cmdline').strip()
×
1172

1173
    @property
1✔
1174
    @context.quietfunc
1✔
1175
    def lastmsg(self):
1176
        root()
×
1177
        if 'last_kmsg' in listdir('/proc'):
×
1178
            return read('/proc/last_kmsg')
×
1179

1180
        if 'console-ramoops' in listdir('/sys/fs/pstore/'):
×
1181
            return read('/sys/fs/pstore/console-ramoops')
×
1182

1183
    def enable_uart(self):
1✔
1184
        """Reboots the device with kernel logging to the UART enabled."""
1185
        model = getprop('ro.product.model')
×
1186

1187
        known_commands = {
×
1188
            'Nexus 4': None,
1189
            'Nexus 5': None,
1190
            'Nexus 6': 'oem config console enable',
1191
            'Nexus 5X': None,
1192
            'Nexus 6P': 'oem uart enable',
1193
            'Nexus 7': 'oem uart-on',
1194
        }
1195

1196
        with log.waitfor('Enabling kernel UART'):
×
1197

1198
            if model not in known_commands:
×
1199
                log.error("Device UART is unsupported.")
×
1200

1201
            command = known_commands[model]
×
1202

1203
            if command is None:
×
1204
                w.success('Always enabled')
×
1205
                return
×
1206

1207
            # Check the current commandline, it may already be enabled.
1208
            if any(s.startswith('console=tty') for s in self.cmdline.split()):
×
1209
                w.success("Already enabled")
×
1210
                return
×
1211

1212
            # Need to be root
1213
            with context.local(device=context.device):
×
1214
                # Save off the command line before rebooting to the bootloader
1215
                cmdline = kernel.cmdline
×
1216

1217
                reboot_bootloader()
×
1218

1219
                # Wait for device to come online
1220
                while context.device not in fastboot(['devices',' -l']):
×
1221
                    time.sleep(0.5)
×
1222

1223
                # Try the 'new' way
1224
                fastboot(command.split())
×
1225
                fastboot(['continue'])
×
1226
                wait_for_device()
×
1227

1228

1229
kernel = Kernel()
1✔
1230

1231
class Property(object):
1✔
1232
    def __init__(self, name=None):
1✔
1233
        # Need to avoid overloaded setattr() so we go through __dict__
1234
        self.__dict__['_name'] = name
1✔
1235

1236
    def __str__(self):
1✔
1237
        return str(getprop(self._name)).strip()
×
1238

1239
    def __getattr__(self, attr):
1✔
1240
        if attr.startswith('_'):
1!
1241
            raise AttributeError(attr)
1✔
1242
        if self._name:
×
1243
            attr = '%s.%s' % (self._name, attr)
×
1244
        return Property(attr)
×
1245

1246
    def __setattr__(self, attr, value):
1✔
1247
        if attr in self.__dict__:
×
1248
            return super(Property, self).__setattr__(attr, value)
×
1249

1250
        if self._name:
×
1251
            attr = '%s.%s' % (self._name, attr)
×
1252
        setprop(attr, value)
×
1253

1254
    def __eq__(self, other):
1✔
1255
        """Allow simple comparison
1256

1257
        Example:
1258

1259
        .. doctest::
1260
           :skipif: skip_android
1261

1262
            >>> adb.properties.ro.build.version.sdk == "24"
1263
            True
1264
        """
1265
        if isinstance(other, six.string_types):
×
1266
            return str(self) == other
×
1267
        return super(Property, self).__eq__(other)
×
1268

1269
    def __hash__(self, other):
1✔
1270
        # Allow hash indices matching on the property
1271
        return hash(self._name)
×
1272

1273
properties = Property()
1✔
1274

1275
def _build_date():
1✔
1276
    """Returns the build date in the form YYYY-MM-DD as a string"""
1277
    as_string = getprop('ro.build.date')
×
1278
    as_datetime =  dateutil.parser.parse(as_string)
×
1279
    return as_datetime.strftime('%Y-%b-%d')
×
1280

1281
def find_ndk_project_root(source):
1✔
1282
    '''Given a directory path, find the topmost project root.
1283

1284
    tl;dr "foo/bar/jni/baz.cpp" ==> "foo/bar"
1285
    '''
1286
    ndk_directory = os.path.abspath(source)
×
1287
    while ndk_directory != '/':
×
1288
        if os.path.exists(os.path.join(ndk_directory, 'jni')):
×
1289
            break
×
1290
        ndk_directory = os.path.dirname(ndk_directory)
×
1291
    else:
1292
        return None
×
1293

1294
    return ndk_directory
×
1295

1296
_android_mk_template = '''
1✔
1297
LOCAL_PATH := $(call my-dir)
1298

1299
include $(CLEAR_VARS)
1300
LOCAL_MODULE := %(local_module)s
1301
LOCAL_SRC_FILES := %(local_src_files)s
1302

1303
include $(BUILD_EXECUTABLE)
1304
'''.lstrip()
1305

1306
_application_mk_template = '''
1✔
1307
LOCAL_PATH := $(call my-dir)
1308

1309
include $(CLEAR_VARS)
1310
APP_ABI:= %(app_abi)s
1311
APP_PLATFORM:=%(app_platform)s
1312
'''.lstrip()
1313

1314
def _generate_ndk_project(file_list, abi='arm-v7a', platform_version=21):
1✔
1315
    # Create our project root
1316
    root = tempfile.mkdtemp()
×
1317

1318
    if not isinstance(file_list, (list, tuple)):
×
1319
        file_list = [file_list]
×
1320

1321
    # Copy over the source file(s)
1322
    jni_directory = os.path.join(root, 'jni')
×
1323
    os.mkdir(jni_directory)
×
1324
    for file in file_list:
×
1325
        shutil.copy(file, jni_directory)
×
1326

1327
    # Create the directories
1328

1329
    # Populate Android.mk
1330
    local_module = os.path.basename(file_list[0])
×
1331
    local_module, _ = os.path.splitext(local_module)
×
1332
    local_src_files = ' '.join(list(map(os.path.basename, file_list)))
×
1333
    Android_mk = os.path.join(jni_directory, 'Android.mk')
×
1334
    with open(Android_mk, 'w+') as f:
×
1335
        f.write(_android_mk_template % locals())
×
1336

1337
    # Populate Application.mk
1338
    app_abi = abi
×
1339
    app_platform = 'android-%s' % platform_version
×
1340
    Application_mk = os.path.join(jni_directory, 'Application.mk')
×
1341
    with open(Application_mk, 'w+') as f:
×
1342
        f.write(_application_mk_template % locals())
×
1343

1344
    return root
×
1345

1346
def compile(source):
1✔
1347
    r"""Compile a source file or project with the Android NDK.
1348

1349
    Example:
1350

1351
    .. doctest::
1352
       :skipif: skip_android
1353

1354
        >>> temp = tempfile.mktemp('.c')
1355
        >>> write(temp, '''
1356
        ... #include <stdio.h>
1357
        ... static char buf[4096];
1358
        ... int main() {
1359
        ...   FILE *fp = fopen("/proc/self/maps", "r");
1360
        ...   int n = fread(buf, 1, sizeof(buf), fp);
1361
        ...   fwrite(buf, 1, n, stdout);
1362
        ...   return 0;
1363
        ... }''')
1364
        >>> filename = adb.compile(temp)
1365
        >>> sent = adb.push(filename, "/data/local/tmp")
1366
        >>> adb.process(sent).recvall() # doctest: +ELLIPSIS
1367
        b'... /system/bin/linker\n...'
1368
    """
1369

1370
    ndk_build = misc.which('ndk-build')
×
1371
    if not ndk_build:
×
1372
        # Ensure that we can find the NDK.
1373
        ndk = os.environ.get('NDK', None)
×
1374
        if ndk is None:
×
1375
            log.error('$NDK must be set to the Android NDK directory')
×
1376
        ndk_build = os.path.join(ndk, 'ndk-build')
×
1377

1378
    # Determine whether the source is an NDK project or a single source file.
1379
    project = find_ndk_project_root(source)
×
1380

1381
    if not project:
×
1382
        # Realistically this should inherit from context.arch, but
1383
        # this works for now.
1384
        sdk = '21'
×
1385
        abi = {
×
1386
            'aarch64': 'arm64-v8a',
1387
            'amd64':   'x86_64',
1388
            'arm':     'armeabi-v7a',
1389
            'i386':    'x86',
1390
            'mips':    'mips',
1391
            'mips64':  'mips64',
1392
        }.get(context.arch, None)
1393

1394
        # If we have an attached device, use its settings.
1395
        if context.device:
×
1396
            abi = getprop('ro.product.cpu.abi')
×
1397
            sdk = getprop('ro.build.version.sdk')
×
1398

1399
        if abi is None:
×
1400
            log.error("Unknown CPU ABI")
×
1401

1402
        project = _generate_ndk_project(source, abi, sdk)
×
1403

1404
    # Remove any output files
1405
    lib = os.path.join(project, 'libs')
×
1406
    if os.path.exists(lib):
×
1407
        shutil.rmtree(lib)
×
1408

1409
    # Build the project
1410
    io = tubes.process.process(ndk_build, cwd=os.path.join(project, 'jni'))
×
1411

1412
    result = io.recvall()
×
1413

1414
    if 0 != io.poll():
×
1415
        log.error("Build failed:\n%s", result)
×
1416

1417
    # Find all of the output files
1418
    output = glob.glob(os.path.join(lib, '*', '*'))
×
1419

1420
    return output[0]
×
1421

1422
class Partition(object):
1✔
1423
    def __init__(self, path, name, blocks=0):
1✔
1424
        self.path = path
×
1425
        self.name = name
×
1426
        self.blocks = blocks
×
1427
        self.size = blocks * 1024
×
1428

1429
    @property
1✔
1430
    def data(self):
1431
        with log.waitfor('Fetching %r partition (%s)' % (self.name, self.path)):
×
1432
            return read(self.path)
×
1433

1434
@with_device
1✔
1435
def walk(top, topdown=True):
1✔
1436
    join = os.path.join
×
1437
    isdir = lambda x: stat.S_ISDIR(x['mode'])
×
1438
    client = AdbClient()
×
1439
    names = client.list(top)
×
1440

1441
    dirs, nondirs = [], []
×
1442
    for name, metadata in names.items():
×
1443
        if isdir(metadata):
×
1444
            dirs.append(name)
×
1445
        else:
1446
            nondirs.append(name)
×
1447

1448
    if topdown:
×
1449
        yield top, dirs, nondirs
×
1450
    for name in dirs:
×
1451
        new_path = join(top, name)
×
1452
        for x in walk(new_path, topdown):
×
1453
            yield x
×
1454
    if not topdown:
×
1455
        yield top, dirs, nondirs
×
1456

1457
@with_device
1✔
1458
def find(top, name):
1459
    for root, dirs, files in walk(top):
×
1460
        if name in files or name in dirs:
×
1461
            yield os.path.join(root, name)
×
1462

1463
@with_device
1✔
1464
def readlink(path):
1465
    path = process(['realpath', path]).recvall()
×
1466

1467
    # Readlink will emit a single newline
1468
    # We can't use the '-n' flag since old versions don't support it
1469
    if path.endswith(b'\n'):
×
1470
        path = path[:-1]
×
1471

1472
    return path.decode()
×
1473

1474
class Partitions(object):
1✔
1475
    """Enable access to partitions
1476

1477
    Example:
1478

1479
    .. doctest::
1480
       :skipif: skip_android
1481

1482
        >>> hex(adb.partitions.vda.size) # doctest: +ELLIPSIS
1483
        '0x...000'
1484
    """
1485
    @property
1✔
1486
    @context.quietfunc
1✔
1487
    def by_name_dir(self):
1488
        try:
×
1489
            return next(find('/dev/block/platform','by-name'))
×
1490
        except StopIteration:
×
1491
            return '/dev/block'
×
1492

1493
    @context.quietfunc
1✔
1494
    def __dir__(self):
1495
        return list(self)
×
1496

1497
    @context.quietfunc
1✔
1498
    def iter_proc_partitions(self):
1499
        for line in read('/proc/partitions').splitlines():
×
1500
            if not line.strip():
×
1501
                continue
×
1502
            major, minor, blocks, name = line.split(None, 4)
×
1503
            yield blocks, name.decode()
×
1504

1505
    @context.quietfunc
1✔
1506
    @with_device
1✔
1507
    def __iter__(self):
1508
        root()
×
1509

1510
        # Find all named partitions
1511
        names = set(listdir(self.by_name_dir))
×
1512

1513
        # Find all unnamed partitions
1514
        for _, name in self.iter_proc_partitions():
×
1515
            names.add(name)
×
1516
        return iter(names)
×
1517

1518
    def __getattr__(self, attr):
1✔
1519
        if attr.startswith('_'):
×
1520
            raise AttributeError(attr)
×
1521

1522
        for name in self:
×
1523
            if name == attr:
×
1524
                break
×
1525
        else:
1526
            raise AttributeError("No partition %r" % attr)
×
1527

1528
        path = os.path.join(self.by_name_dir, name)
×
1529

1530
        with context.quiet:
×
1531
            # Find the actual path of the device
1532
            devpath = readlink(path)
×
1533
            devname = os.path.basename(devpath)
×
1534

1535
            # Get the size of the partition
1536
            for blocks, name in self.iter_proc_partitions():
×
1537
                if name in (devname, attr):
×
1538
                    break
×
1539
            else:
1540
                log.error("Could not find size of partition %r" % name)
×
1541

1542
        return Partition(devpath, attr, int(blocks))
×
1543

1544
partitions = Partitions()
1✔
1545

1546
def install(apk, *arguments):
1✔
1547
    """Install an APK onto the device.
1548

1549
    This is a wrapper around 'pm install', which backs 'adb install'.
1550

1551
    Arguments:
1552
        apk(str): Path to the APK to intall (e.g. ``'foo.apk'``)
1553
        arguments: Supplementary arguments to 'pm install',
1554
            e.g. ``'-l', '-g'``.
1555
    """
1556
    if not apk.endswith('.apk'):
×
1557
        log.error("APK must have .apk extension")
×
1558

1559
    basename = os.path.basename(apk)
×
1560
    target_path = '/data/local/tmp/{}.apk'.format(basename)
×
1561

1562
    with log.progress("Installing APK {}".format(basename)) as p:
×
1563
        with context.quiet:
×
1564
            p.status('Copying APK to device')
×
1565
            push(apk, target_path)
×
1566

1567
            p.status('Installing')
×
1568
            result = process(['pm', 'install-create', target_path] + list(arguments)).recvall()
×
1569

1570
            status = result.splitlines()[-1]
×
1571
            if 'Success' not in status:
×
1572
                log.error(status)
×
1573

1574
def uninstall(package, *arguments):
1✔
1575
    """Uninstall an APK from the device.
1576

1577
    This is a wrapper around 'pm uninstall', which backs 'adb uninstall'.
1578

1579
    Arguments:
1580
        package(str): Name of the package to uninstall (e.g. ``'com.foo.MyPackage'``)
1581
        arguments: Supplementary arguments to ``'pm install'``, e.g. ``'-k'``.
1582
    """
1583
    with log.progress("Uninstalling package {}".format(package)):
×
1584
        with context.quiet:
×
1585
            return process(['pm','uninstall',package] + list(arguments)).recvall()
×
1586

1587
@context.quietfunc
1✔
1588
def packages():
1589
    """Returns a list of packages installed on the system"""
1590
    packages = process(['pm', 'list', 'packages']).recvall()
×
1591
    return [line.split('package:', 1)[-1] for line in packages.splitlines()]
×
1592

1593
@context.quietfunc
1✔
1594
def version():
1595
    """Returns rthe platform version as a tuple."""
1596
    prop = getprop('ro.build.version.release')
×
1597
    return [int(v) for v in prop.split('.')]
×
1598

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc