Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

meejah / txtorcon / 1239

10 Dec 2018 - 9:20 coverage decreased (-0.02%) to 99.936%
1239

Pull #329

travis-ci

9181eb84f9c35729a3bad740fb7f9d93?size=18&default=identiconweb-flow
release note
Pull Request #329: Ticket293 purpose circ

2 of 3 new or added lines in 1 file covered. (66.67%)

4668 of 4671 relevant lines covered (99.94%)

15.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.75
/txtorcon/torstate.py
1
# -*- coding: utf-8 -*-
2

3
from __future__ import absolute_import
16×
4
from __future__ import print_function
16×
5
from __future__ import with_statement
16×
6

7
import collections
16×
8
import os
16×
9
import stat
16×
10
import warnings
16×
11

12
from twisted.internet import defer
16×
13
from twisted.python.failure import Failure
16×
14
from twisted.internet.endpoints import TCP4ClientEndpoint
16×
15
from twisted.internet.endpoints import UNIXClientEndpoint
16×
16
from twisted.internet.interfaces import IReactorCore
16×
17
from twisted.internet.interfaces import IStreamClientEndpoint
16×
18
from zope.interface import implementer
16×
19

20
from txtorcon.torcontrolprotocol import TorProtocolFactory
16×
21
from txtorcon.stream import Stream
16×
22
from txtorcon.circuit import Circuit, _extract_reason
16×
23
from txtorcon.router import Router, hashFromHexId
16×
24
from txtorcon.addrmap import AddrMap
16×
25
from txtorcon.torcontrolprotocol import parse_keywords
16×
26
from txtorcon.log import txtorlog
16×
27
from txtorcon.torcontrolprotocol import TorProtocolError
16×
28

29
from txtorcon.interface import ITorControlProtocol
16×
30
from txtorcon.interface import IRouterContainer
16×
31
from txtorcon.interface import ICircuitListener
16×
32
from txtorcon.interface import ICircuitContainer
16×
33
from txtorcon.interface import IStreamListener
16×
34
from txtorcon.interface import IStreamAttacher
16×
35
from ._microdesc_parser import MicrodescriptorParser
16×
36
from .router import hexIdFromHash
16×
37
from .util import maybe_coroutine
16×
38

39

40
def _build_state(proto):
16×
41
    state = TorState(proto)
16×
42
    return state.post_bootstrap
16×
43

44

45
def _wait_for_proto(proto):
16×
46
    return proto.post_bootstrap
16×
47

48

49
def build_tor_connection(connection, build_state=True, wait_for_proto=True,
16×
50
                         password_function=lambda: None):
51
    """
52
    This is used to build a valid TorState (which has .protocol for
53
    the TorControlProtocol). For example::
54

55
        from twisted.internet import reactor
56
        from twisted.internet.endpoints import TCP4ClientEndpoint
57
        import txtorcon
58

59
        def example(state):
60
            print "Fully bootstrapped state:",state
61
            print "   with bootstrapped protocol:",state.protocol
62

63
        d = txtorcon.build_tor_connection(TCP4ClientEndpoint(reactor,
64
                                                             "localhost",
65
                                                             9051))
66
        d.addCallback(example)
67
        reactor.run()
68

69
    :param password_function:
70
        See :class:`txtorcon.TorControlProtocol`
71

72
    :param build_state:
73
        If True (the default) a TorState object will be
74
        built as well. If False, just a TorControlProtocol will be
75
        returned via the Deferred.
76

77
    :return:
78
        a Deferred that fires with a TorControlProtocol or, if you
79
        specified build_state=True, a TorState. In both cases, the
80
        object has finished bootstrapping
81
        (i.e. TorControlProtocol.post_bootstrap or
82
        TorState.post_bootstap has fired, as needed)
83
    """
84

85
    if IStreamClientEndpoint.providedBy(connection):
16×
86
        endpoint = connection
16×
87

88
    elif isinstance(connection, tuple):
16×
89
        if len(connection) == 2:
16×
90
            reactor, socket = connection
16×
91
            if (os.path.exists(socket) and
16×
92
                os.stat(socket).st_mode & (stat.S_IRGRP | stat.S_IRUSR |
93
                                           stat.S_IROTH)):
94
                endpoint = UNIXClientEndpoint(reactor, socket)
16×
95
            else:
96
                raise ValueError('Can\'t use "%s" as a socket' % (socket, ))
16×
97
        elif len(connection) == 3:
16×
98
            endpoint = TCP4ClientEndpoint(*connection)
16×
99
        else:
100
            raise TypeError('Expected either a (reactor, socket)- or a '
16×
101
                            '(reactor, host, port)-tuple for argument '
102
                            '"connection", got %s' % (connection, ))
103
    else:
104
        raise TypeError('Expected a (reactor, socket)- or a (reactor, host, '
16×
105
                        'port)-tuple or an object implementing IStreamClient'
106
                        'Endpoint for argument "connection", got %s' %
107
                        (connection, ))
108

109
    d = endpoint.connect(
16×
110
        TorProtocolFactory(
111
            password_function=password_function
112
        )
113
    )
114
    if build_state:
16×
115
        d.addCallback(build_state
16×
116
                      if isinstance(build_state, collections.Callable)
117
                      else _build_state)
118
    elif wait_for_proto:
16×
119
        d.addCallback(wait_for_proto
16×
120
                      if isinstance(wait_for_proto, collections.Callable)
121
                      else _wait_for_proto)
122
    return d
16×
123

124

125
def build_local_tor_connection(reactor, host='127.0.0.1', port=9051,
16×
126
                               socket='/var/run/tor/control', *args, **kwargs):
127
    """
128
    This builds a connection to a local Tor, either via 127.0.0.1:9051
129
    or /var/run/tor/control (by default; the latter is tried
130
    first). See also :meth:`build_tor_connection
131
    <txtorcon.torstate.build_tor_connection>` for other key-word
132
    arguments that are accepted here also.
133

134
    **Note**: new code should use :meth:`txtorcon.connect` instead.
135

136
    :param host:
137
        An IP address to find Tor at. Corresponds to the
138
        ControlListenAddress torrc option.
139

140
    :param port:
141
        The port to use with the address when trying to contact
142
        Tor. This corresponds to the ControlPort option in torrc
143
        (default is 9051).
144
    """
145

146
    try:
16×
147
        return build_tor_connection((reactor, socket), *args, **kwargs)
16×
148
    except Exception:
16×
149
        return build_tor_connection((reactor, host, port), *args, **kwargs)
16×
150

151

152
def flags_from_dict(kw):
16×
153
    """
154
    This turns a dict with keys that are flags (e.g. for CLOSECIRCUIT,
155
    CLOSESTREAM) only if the values are true.
156
    """
157

158
    if len(kw) == 0:
16×
159
        return ''
16×
160

161
    flags = ''
16×
162
    for (k, v) in kw.items():
16×
163
        if v:
16×
164
            flags += ' ' + str(k)
16×
165
    # note that we want the leading space if there's at least one
166
    # flag.
167
    return flags
16×
168

169

170
@implementer(ICircuitListener)
16×
171
@implementer(ICircuitContainer)
16×
172
@implementer(IRouterContainer)
16×
173
@implementer(IStreamListener)
16×
174
class TorState(object):
16×
175
    """
176
    This tracks the current state of Tor using a TorControlProtocol.
177

178
    On setup it first queries the initial state of streams and
179
    circuits. It then asks for updates via the listeners. It requires
180
    an ITorControlProtocol instance. The control protocol doesn't need
181
    to be bootstrapped yet. The Deferred .post_boostrap is driggered
182
    when the TorState instance is fully ready to go.  The easiest way
183
    is to use the helper method
184
    :func:`txtorcon.build_tor_connection`. For details, see the
185
    implementation of that.
186

187
    You may add an :class:`txtorcon.interface.IStreamAttacher` to
188
    provide a custom mapping for Strams to Circuits (by default Tor
189
    picks by itself).
190

191
    This is also a good example of the various listeners, and acts as
192
    an :class:`txtorcon.interface.ICircuitContainer` and
193
    :class:`txtorcon.interface.IRouterContainer`.
194

195
    :cvar DO_NOT_ATTACH:
196
        Constant to return from an IAttacher indicating you don't want to
197
        attach this stream at all.
198

199
    """
200

201
    @classmethod
16×
202
    def from_protocol(cls, protocol, **kw):
203
        '''
204
        Create a new, boot-strapped TorState from a TorControlProtocol
205
        instance.
206

207
        :return: a Deferred that fires with a TorState instance
208
        '''
209
        state = TorState(protocol, bootstrap=True)
16×
210
        return state.post_bootstrap
16×
211

212
    def __init__(self, protocol, bootstrap=True):
16×
213
        self.protocol = ITorControlProtocol(protocol)
16×
214
        # fixme could use protocol.on_disconnect to re-connect; see issue #3
215

216
        # could override these to get your own Circuit/Stream subclasses
217
        # to track these things
218
        self.circuit_factory = Circuit
16×
219
        self.stream_factory = Stream
16×
220

221
        self._attacher = None
16×
222
        """If set, provides
223
        :class:`txtorcon.interface.IStreamAttacher` to attach new
224
        streams we hear about."""
225

226
        self.tor_binary = 'tor'
16×
227

228
        self.circuit_listeners = []
16×
229
        self.stream_listeners = []
16×
230

231
        self.addrmap = AddrMap()
16×
232
        #: keys on id (integer)
233
        self.circuits = {}
16×
234

235
        #: keys on id (integer)
236
        self.streams = {}
16×
237

238
        #: list of unique routers
239
        self.all_routers = set()
16×
240

241
        #: keys by hexid (string) and by unique names
242
        self.routers = {}
16×
243
        self._old_routers = {}
16×
244

245
        #: keys on name, value always list (many duplicate "Unnamed"
246
        #: routers, for example)
247
        self.routers_by_name = {}
16×
248

249
        #: keys by hexid (string)
250
        self.routers_by_hash = {}
16×
251

252
        #: potentially-usable as entry guards, I think? (any router
253
        #: with 'Guard' flag)
254
        self.guards = {}
16×
255

256
        #: from GETINFO entry-guards, our current entry guards
257
        self.entry_guards = {}
16×
258

259
        #: list of entry guards we didn't parse out
260
        self.unusable_entry_guards = []
16×
261

262
        #: keys by name
263
        self.authorities = {}
16×
264

265
        #: see set_attacher
266
        self._cleanup = None
16×
267

268
        self._network_status_parser = MicrodescriptorParser(self._create_router)
16×
269

270
        self.post_bootstrap = defer.Deferred()
16×
271
        if bootstrap:
16×
272
            self.protocol.post_bootstrap.addCallback(self._bootstrap)
16×
273
            self.protocol.post_bootstrap.addErrback(self.post_bootstrap.errback)
16×
274

275
    def _create_router(self, **kw):
16×
276
        id_hex = hexIdFromHash(kw['idhash'])
16×
277
        try:
16×
278
            router = self._old_routers[id_hex]
16×
279
        except KeyError:
16×
280
            router = Router(self.protocol)
16×
281

282
        self.routers[id_hex] = router
16×
283
        router.from_consensus = True
16×
284
        router.update(
16×
285
            kw['nickname'],
286
            kw['idhash'],
287
            kw['orhash'],
288
            kw['modified'],
289
            kw['ip'],
290
            kw['orport'],
291
            kw['dirport'],
292
        )
293
        router.flags = kw.get('flags', [])
16×
294
        if 'bandwidth' in kw:
16×
295
            router.bandwidth = kw['bandwidth']
16×
296
        if 'ip_v6' in kw:
16×
297
            router.ip_v6.extend(kw['ip_v6'])
16×
298

299
        if 'guard' in router.flags:
16×
300
            self.guards[router.id_hex] = router
16×
301
        if 'authority' in router.flags:
16×
302
            self.authorities[router.name] = router
16×
303

304
        if router.name in self.routers:
16×
305
            self.routers[router.name] = None
16×
306

307
        else:
308
            self.routers[router.name] = router
16×
309

310
        if router.name in self.routers_by_name:
16×
311
            self.routers_by_name[router.name].append(router)
16×
312

313
        else:
314
            self.routers_by_name[router.name] = [router]
16×
315

316
        self.routers[router.id_hex] = router
16×
317
        self.routers_by_hash[router.id_hex] = router
16×
318
        self.all_routers.add(router)
16×
319

320
    @defer.inlineCallbacks
16×
321
    def _bootstrap(self, arg=None):
16×
322
        "This takes an arg so we can use it as a callback (see __init__)."
323

324
        # update list of routers (must be before we do the
325
        # circuit-status)
326

327
        # look out! we're depending on get_info_incremental returning
328
        # *lines*, which isn't documented -- but will be true because
329
        # TorControlProtocol is a LineReceiver...
330
        yield self.protocol.get_info_incremental(
16×
331
            'ns/all',
332
            self._network_status_parser.feed_line,
333
        )
334
        self._network_status_parser.done()
16×
335

336
        # update list of existing circuits
337
        cs = yield self.protocol.get_info_raw('circuit-status')
16×
338
        self._circuit_status(cs)
16×
339

340
        # update list of streams
341
        ss = yield self.protocol.get_info_raw('stream-status')
16×
342
        self._stream_status(ss)
16×
343

344
        # update list of existing address-maps
345
        key = 'address-mappings/all'
16×
346
        am = yield self.protocol.get_info_raw(key)
16×
347
        # strip addressmappsings/all= and OK\n from raw data
348
        am = am[len(key) + 1:]
16×
349
        for line in am.split('\n'):
16×
350
            if len(line.strip()) == 0:
16×
351
                continue            # FIXME
16×
352
            self.addrmap.update(line)
16×
353

354
        yield self._add_events()
16×
355

356
        entries = yield self.protocol.get_info_raw("entry-guards")
16×
357
        for line in entries.split('\n')[1:]:
16×
358
            if len(line.strip()) == 0 or line.strip() == 'OK':
16×
359
                # XXX does this ever really happen?
360
                continue
16×
361
            args = line.split()
16×
362
            (name, status) = args[:2]
16×
363
            name = name[:41]
16×
364

365
            # this is sometimes redundant, as a missing entry guard
366
            # usually means it won't be in our list of routers right
367
            # now, but just being on the safe side
368
            if status.lower() != 'up':
16×
369
                self.unusable_entry_guards.append(line)
16×
370
                continue
16×
371

372
            try:
16×
373
                self.entry_guards[name] = self.router_from_id(name)
16×
374
            except KeyError:
16×
375
                self.unusable_entry_guards.append(line)
16×
376

377
        # in case process/pid doesn't exist and we don't know the PID
378
        # because we own it, we just leave it as 0 (previously
379
        # guessed using psutil, but that only works if there's
380
        # exactly one tor running anyway)
381
        try:
16×
382
            pid = yield self.protocol.get_info_raw("process/pid")
16×
383
        except TorProtocolError:
16×
384
            pid = None
16×
385
        self.tor_pid = 0
16×
386
        if pid:
16×
387
            try:
16×
388
                pid = parse_keywords(pid)['process/pid']
16×
389
                self.tor_pid = int(pid)
16×
390
            except Exception:  # fixme: ValueError and KeyError ..?
16×
391
                self.tor_pid = 0
16×
392
        if not self.tor_pid and self.protocol.is_owned:
16×
393
            self.tor_pid = self.protocol.is_owned
16×
394

395
        self.post_bootstrap.callback(self)
16×
396
        self.post_boostrap = None
16×
397

398
    # XXX this should be hidden as _undo_attacher
399
    def undo_attacher(self):
16×
400
        """
401
        Shouldn't Tor handle this by turning this back to 0 if the
402
        controller that twiddled it disconnects?
403
        """
404

405
        return self.protocol.set_conf("__LeaveStreamsUnattached", 0)
16×
406

407
    def set_attacher(self, attacher, myreactor):
16×
408
        """
409
        Provide an :class:`txtorcon.interface.IStreamAttacher` to
410
        associate streams to circuits.
411

412
        You are Strongly Encouraged to **not** use this API directly,
413
        and instead use :meth:`txtorcon.Circuit.stream_via` or
414
        :meth:`txtorcon.Circuit.web_agent` instead. If you do need to
415
        use this API, it's an error if you call either of the other
416
        two methods.
417

418
        This won't get turned on until after bootstrapping is
419
        completed. ('__LeaveStreamsUnattached' needs to be set to '1'
420
        and the existing circuits list needs to be populated).
421
        """
422

423
        react = IReactorCore(myreactor)
16×
424
        if attacher:
16×
425
            if self._attacher is attacher:
16×
426
                return
16×
427
            if self._attacher is not None:
16×
428
                raise RuntimeError(
16×
429
                    "set_attacher called but we already have an attacher"
430
                )
431
            self._attacher = IStreamAttacher(attacher)
16×
432
        else:
433
            self._attacher = None
16×
434

435
        if self._attacher is None:
16×
436
            d = self.undo_attacher()
16×
437
            if self._cleanup:
16×
438
                react.removeSystemEventTrigger(self._cleanup)
16×
439
                self._cleanup = None
16×
440

441
        else:
442
            d = self.protocol.set_conf("__LeaveStreamsUnattached", "1")
16×
443
            self._cleanup = react.addSystemEventTrigger(
16×
444
                'before', 'shutdown',
445
                self.undo_attacher,
446
            )
447
        return d
16×
448

449
    # noqa
450
    stream_close_reasons = {
16×
451
        'REASON_MISC': 1,               # (catch-all for unlisted reasons)
452
        'REASON_RESOLVEFAILED': 2,      # (couldn't look up hostname)
453
        'REASON_CONNECTREFUSED': 3,     # (remote host refused connection) [*]
454
        'REASON_EXITPOLICY': 4,         # (OR refuses to connect to host or port)
455
        'REASON_DESTROY': 5,            # (Circuit is being destroyed)
456
        'REASON_DONE': 6,               # (Anonymized TCP connection was closed)
457
        'REASON_TIMEOUT': 7,            # (Connection timed out, or OR timed out while connecting)
458
        'REASON_NOROUTE': 8,            # (Routing error while attempting to contact destination)
459
        'REASON_HIBERNATING': 9,        # (OR is temporarily hibernating)
460
        'REASON_INTERNAL': 10,          # (Internal error at the OR)
461
        'REASON_RESOURCELIMIT': 11,     # (OR has no resources to fulfill request)
462
        'REASON_CONNRESET': 12,         # (Connection was unexpectedly reset)
463
        'REASON_TORPROTOCOL': 13,       # (Sent when closing connection because of Tor protocol violations.)
464
        'REASON_NOTDIRECTORY': 14}      # (Client sent RELAY_BEGIN_DIR to a non-directory relay.)
465

466
    def close_stream(self, stream, reason='REASON_MISC', **kwargs):
16×
467
        """
468
        This sends a STREAMCLOSE command, using the specified reason
469
        (either an int or one of the 14 strings in section 6.3 of
470
        tor-spec.txt if the argument is a string). Any kwards are
471
        passed through as flags if they evaluated to true
472
        (e.g. "SomeFlag=True"). Currently there are none that Tor accepts.
473
        """
474

475
        if type(stream) != int:
16×
476
            # assume it's a Stream instance
477
            stream = stream.id
16×
478
        try:
16×
479
            reason = int(reason)
16×
480
        except ValueError:
16×
481
            try:
16×
482
                reason = TorState.stream_close_reasons[reason]
16×
483
            except KeyError:
16×
484
                raise ValueError(
16×
485
                    'Unknown stream close reason "%s"' % str(reason)
486
                )
487

488
        flags = flags_from_dict(kwargs)
16×
489

490
        # stream is now an ID no matter what we passed in
491
        cmd = 'CLOSESTREAM %d %d%s' % (stream, reason, flags)
16×
492
        return self.protocol.queue_command(cmd)
16×
493

494
    def close_circuit(self, circid, **kwargs):
16×
495
        """
496
        This sends a CLOSECIRCUIT command, using any keyword arguments
497
        passed as the Flags (currently, that is just 'IfUnused' which
498
        means to only close the circuit when it is no longer used by
499
        any streams).
500

501
        :param circid:
502
            Either a circuit-id (int) or a Circuit instance
503

504
        :return:
505
            a Deferred which callbacks with the result of queuing the
506
            command to Tor (usually "OK"). If you want to instead know
507
            when the circuit is actually-gone, see
508
            :meth:`Circuit.close <txtorcon.circuit.Circuit.close>`
509
        """
510

511
        if type(circid) != int:
16×
512
            # assume it's a Circuit instance
513
            circid = circid.id
16×
514
        flags = flags_from_dict(kwargs)
16×
515
        return self.protocol.queue_command(
16×
516
            'CLOSECIRCUIT %s%s' % (circid, flags)
517
        )
518

519
    def add_circuit_listener(self, icircuitlistener):
16×
520
        """
521
        Adds a new instance of :class:`txtorcon.interface.ICircuitListener` which
522
        will receive updates for all existing and new circuits.
523
        """
524
        listen = ICircuitListener(icircuitlistener)
16×
525
        for circ in self.circuits.values():
16×
526
            circ.listen(listen)
16×
527
        self.circuit_listeners.append(listen)
16×
528

529
    def add_stream_listener(self, istreamlistener):
16×
530
        """
531
        Adds a new instance of :class:`txtorcon.interface.IStreamListener` which
532
        will receive updates for all existing and new streams.
533
        """
534
        listen = IStreamListener(istreamlistener)
16×
535
        for stream in self.streams.values():
16×
536
            stream.listen(listen)
16×
537
        self.stream_listeners.append(listen)
16×
538

539
    def _find_circuit_after_extend(self, x):
16×
540
        ex, circ_id = x.split()
16×
541
        if ex != 'EXTENDED':
16×
542
            raise RuntimeError('Expected EXTENDED, got "%s"' % x)
16×
543
        circ_id = int(circ_id)
16×
544
        circ = self._maybe_create_circuit(circ_id)
16×
545
        circ.update([str(circ_id), 'EXTENDED'])
16×
546
        return circ
16×
547

548
    def build_circuit(self, routers=None, using_guards=True, purpose=None):
16×
549
        """
550
        Builds a circuit consisting of exactly the routers specified,
551
        in order.  This issues an EXTENDCIRCUIT call to Tor with all
552
        the routers specified.
553

554
        :param routers: a list of Router instances which is the path
555
            desired. To allow Tor to choose the routers itself, pass
556
            None (the default) for routers.
557

558
        :param using_guards: A warning is issued if the first router
559
            isn't in self.entry_guards.
560

561
        :return:
562
            A Deferred that will callback with a Circuit instance
563
            (with the .id member being valid, and probably nothing
564
            else).
565
        """
566

567
        if routers is None or routers == []:
16×
568
            cmd = "EXTENDCIRCUIT 0"
16×
569

570
        else:
571
            if using_guards and routers[0] not in self.entry_guards.values():
16×
572
                warnings.warn(
16×
573
                    "Circuit doesn't start with a guard: %s" % routers,
574
                    RuntimeWarning
575
                )
576
            cmd = "EXTENDCIRCUIT 0 "
16×
577
            first = True
16×
578
            for router in routers:
16×
579
                if first:
16×
580
                    first = False
16×
581
                else:
582
                    cmd += ','
16×
583
                # XXX should we really accept bytes here?
584
                if isinstance(router, bytes) and len(router) == 40 \
16×
585
                   and hashFromHexId(router):
586
                    cmd += router.decode('utf8')
16×
587
                else:
588
                    cmd += router.id_hex[1:]
16×
589

590
            if purpose is not None:
16×
NEW
591
                cmd += " purpose={}".format(purpose)
!
592
        d = self.protocol.queue_command(cmd)
16×
593
        d.addCallback(self._find_circuit_after_extend)
16×
594
        return d
16×
595

596
    DO_NOT_ATTACH = object()
16×
597

598
    # @defer.inlineCallbacks  (this method is async, be nice to mark it ...)
599
    def _maybe_attach(self, stream):
16×
600
        """
601
        If we've got a custom stream-attachment instance (see
602
        set_attacher) this will ask it for the appropriate
603
        circuit. Note that we ignore .exit URIs and let Tor deal with
604
        those (by passing circuit ID 0).
605

606
        The stream attacher is allowed to return a Deferred which will
607
        callback with the desired circuit.
608

609
        You may return the special object DO_NOT_ATTACH which will
610
        cause the circuit attacher to simply ignore the stream
611
        (neither attaching it, nor telling Tor to attach it).
612
        """
613

614
        if self._attacher is None:
16×
615
            return None
16×
616

617
        if stream.target_host is not None \
16×
618
           and '.exit' in stream.target_host:
619
            # we want to totally ignore .exit URIs as these are
620
            # used to specify a particular exit node, and trying
621
            # to do STREAMATTACH on them will fail with an error
622
            # from Tor anyway.
623
            txtorlog.msg("ignore attacher:", stream)
16×
624
            return
16×
625

626
        # handle async or sync .attach() the same
627
        circ_d = defer.maybeDeferred(
16×
628
            self._attacher.attach_stream,
629
            stream, self.circuits,
630
        )
631
        circ_d.addCallback(maybe_coroutine)
16×
632

633
        # actually do the attachment logic; .attach() can return 3 things:
634
        #    1. None: let Tor do whatever it wants
635
        #    2. DO_NOT_ATTACH: don't attach the stream at all
636
        #    3. Circuit instance: attach to the provided circuit
637
        def issue_stream_attach(circ):
16×
638
            txtorlog.msg("circuit:", circ)
16×
639
            if circ is None or circ is TorState.DO_NOT_ATTACH:
16×
640
                # tell Tor to do what it likes
641
                return self.protocol.queue_command(
16×
642
                    u"ATTACHSTREAM {} 0".format(stream.id).encode("ascii")
643
                )
644

645
            else:
646
                # should get a Circuit instance; check it for suitability
647
                if not isinstance(circ, Circuit):
16×
648
                    raise RuntimeError(
16×
649
                        "IStreamAttacher.attach() must return a Circuit instance "
650
                        "(or None or DO_NOT_ATTACH): %s"
651
                    )
652
                if circ.id not in self.circuits:
16×
653
                    raise RuntimeError(
16×
654
                        "Attacher returned a circuit unknown to me."
655
                    )
656
                if circ.state != 'BUILT':
16×
657
                    raise RuntimeError(
16×
658
                        "Can only attach to BUILT circuits; %d is in %s." %
659
                        (circ.id, circ.state)
660
                    )
661
                # we've got a valid Circuit instance; issue the command
662
                return self.protocol.queue_command(
16×
663
                    u"ATTACHSTREAM {} {}".format(stream.id, circ.id).encode("ascii")
664
                )
665

666
        circ_d.addCallback(issue_stream_attach)
16×
667
        circ_d.addErrback(self._attacher_error)
16×
668
        return circ_d
16×
669

670
    def _attacher_error(self, fail):
16×
671
        """
672
        not ideal, but there's not really a good way to let the caller
673
        handler errors :/ since we ultimately call this due to an
674
        async request from Tor. Mostly these errors will be logic or
675
        syntax errors in the caller's code anyway.
676

677
        tests monkey-patch this to reduce spew
678
        """
679
        print("Failure while attaching stream:", fail)
16×
680
        return fail
16×
681

682
    def _circuit_status(self, data):
16×
683
        """Used internally as a callback for updating Circuit information"""
684

685
        data = data[len('circuit-status='):].split('\n')
16×
686
        # sometimes there's a newline after circuit-status= and
687
        # sometimes not, so we get rid of it
688
        if len(data) and len(data[0].strip()) == 0:
16×
689
            data = data[1:]
16×
690

691
        for line in data:
16×
692
            self._circuit_update(line)
16×
693

694
    def _stream_status(self, data):
16×
695
        "Used internally as a callback for updating Stream information"
696
        # there's a slight issue with a single-stream vs >= 2 streams,
697
        # in that in the latter case we have a line by itself with
698
        # "stream-status=" on it followed by the streams EXCEPT in the
699
        # single-stream case which has "stream-status=123 blahblah"
700
        # (i.e. the key + value on one line)
701

702
        lines = data.split('\n')
16×
703
        if len(lines) == 1:
16×
704
            d = lines[0][len('stream-status='):]
16×
705
            # if there are actually 0 streams, then there's nothing
706
            # left to parse
707
            if len(d):
16×
708
                self._stream_update(d)
16×
709
        else:
710
            [self._stream_update(line) for line in lines[1:]]
16×
711

712
    def _update_network_status(self, data):
16×
713
        """
714
        Used internally as a callback for updating Router information
715
        from NEWCONSENSUS events.
716
        """
717

718
        # XXX why are we ever getting this with 0 data?
719
        if len(data):
16×
720
            self._old_routers = self.routers
16×
721
            self.routers = dict()
16×
722
            self.all_routers = set()
16×
723
            self.routers_by_hash = dict()
16×
724
            self.routers_by_name = dict()
16×
725
            for line in data.split('\n'):
16×
726
                self._network_status_parser.feed_line(line)
16×
727
            self._network_status_parser.done()
16×
728

729
        txtorlog.msg(len(self.routers_by_name), "named routers found.")
16×
730
        # remove any names we added that turned out to have dups
731
        remove_keys = set()
16×
732
        for (k, v) in self.routers.items():
16×
733
            if v is None:
16×
734
                txtorlog.msg(len(self.routers_by_name[k]), "dups:", k)
16×
735
                remove_keys.add(k)
16×
736
        for k in remove_keys:
16×
737
            del self.routers[k]
16×
738

739
        txtorlog.msg(len(self.guards), "GUARDs")
16×
740

741
    def _maybe_create_circuit(self, circ_id):
16×
742
        if circ_id not in self.circuits:
16×
743
            c = self.circuit_factory(self)
16×
744
            c.listen(self)
16×
745
            for listener in self.circuit_listeners:
16×
746
                c.listen(listener)
16×
747

748
        else:
749
            c = self.circuits[circ_id]
16×
750
        return c
16×
751

752
    def _circuit_update(self, line):
16×
753
        """
754
        Used internally as a callback to update Circuit information
755
        from CIRC events.
756
        """
757

758
        # print("circuit_update", line)
759
        args = line.split()
16×
760
        circ_id = int(args[0])
16×
761

762
        c = self._maybe_create_circuit(circ_id)
16×
763
        c.update(args)
16×
764

765
    def _stream_update(self, line):
16×
766
        """
767
        Used internally as a callback to update Stream information
768
        from STREAM events.
769
        """
770

771
        if line.strip() == 'stream-status=':
16×
772
            # this happens if there are no active streams
773
            return
16×
774

775
        args = line.split()
16×
776
        assert len(args) >= 3
16×
777

778
        stream_id = int(args[0])
16×
779
        wasnew = False
16×
780
        if stream_id not in self.streams:
16×
781
            stream = self.stream_factory(self, self.addrmap)
16×
782
            self.streams[stream_id] = stream
16×
783
            stream.listen(self)
16×
784
            for x in self.stream_listeners:
16×
785
                stream.listen(x)
16×
786
            wasnew = True
16×
787
        self.streams[stream_id].update(args)
16×
788

789
        # if the update closed the stream, it won't be in our list
790
        # anymore. FIXME: how can we ever hit such a case as the
791
        # first update being a CLOSE?
792
        if wasnew and stream_id in self.streams:
16×
793
            self._maybe_attach(self.streams[stream_id])
16×
794

795
    def _addr_map(self, addr):
16×
796
        "Internal callback to update DNS cache. Listens to ADDRMAP."
797
        txtorlog.msg(" --> addr_map", addr)
16×
798
        self.addrmap.update(addr)
16×
799

800
    event_map = {
16×
801
        'STREAM': '_stream_update',
802
        'CIRC': '_circuit_update',
803
        'NEWCONSENSUS': '_update_network_status',
804
        'ADDRMAP': '_addr_map',
805
    }
806

807
    @defer.inlineCallbacks
16×
808
    def _add_events(self):
809
        """
810
        Add listeners for all the events the controller is interested in.
811
        """
812

813
        for (event, func_name) in self.event_map.items():
16×
814
            yield self.protocol.add_event_listener(
16×
815
                event,
816
                getattr(self, func_name),
817
            )
818

819
    # ICircuitContainer
820

821
    def find_circuit(self, circid):
16×
822
        "ICircuitContainer API"
823
        return self.circuits[circid]
16×
824

825
    # IRouterContainer
826

827
    def router_from_id(self, routerid):
16×
828
        """IRouterContainer API"""
829

830
        try:
16×
831
            return self.routers[routerid[:41]]
16×
832

833
        except KeyError:
16×
834
            if routerid[0] != '$':
16×
835
                raise                   # just re-raise the KeyError
16×
836

837
            router = Router(self.protocol)
16×
838
            idhash = routerid[1:41]
16×
839
            nick = ''
16×
840
            is_named = False
16×
841
            if len(routerid) > 41:
16×
842
                nick = routerid[42:]
16×
843
                is_named = routerid[41] == '='
16×
844
            router.update(nick, hashFromHexId(idhash), '0' * 27, 'unknown',
16×
845
                          'unknown', '0', '0')
846
            router.name_is_unique = is_named
16×
847
            self.routers[router.id_hex] = router
16×
848
            return router
16×
849

850
    # implement IStreamListener
851

852
    def stream_new(self, stream):
16×
853
        "IStreamListener: a new stream has been created"
854
        txtorlog.msg("stream_new", stream)
16×
855

856
    def stream_succeeded(self, stream):
16×
857
        "IStreamListener: stream has succeeded"
858
        txtorlog.msg("stream_succeeded", stream)
16×
859

860
    def stream_attach(self, stream, circuit):
16×
861
        """
862
        IStreamListener: the stream has been attached to a circuit. It
863
        seems you get an attach to None followed by an attach to real
864
        circuit fairly frequently. Perhaps related to __LeaveStreamsUnattached?
865
        """
866
        txtorlog.msg("stream_attach", stream.id,
16×
867
                     stream.target_host, " -> ", circuit)
868

869
    def stream_detach(self, stream, **kw):
16×
870
        """
871
        IStreamListener
872
        """
873
        txtorlog.msg("stream_detach", stream.id)
16×
874

875
    def stream_closed(self, stream, **kw):
16×
876
        """
877
        IStreamListener: stream has been closed (won't be in
878
        controller's list anymore)
879
        """
880

881
        txtorlog.msg("stream_closed", stream.id)
16×
882
        del self.streams[stream.id]
16×
883

884
    def stream_failed(self, stream, **kw):
16×
885
        """
886
        IStreamListener: stream failed for some reason (won't be in
887
        controller's list anymore)
888
        """
889

890
        txtorlog.msg("stream_failed", stream.id)
16×
891
        del self.streams[stream.id]
16×
892

893
    # implement ICircuitListener
894

895
    def circuit_launched(self, circuit):
16×
896
        "ICircuitListener API"
897
        txtorlog.msg("circuit_launched", circuit)
16×
898
        self.circuits[circuit.id] = circuit
16×
899

900
    def circuit_extend(self, circuit, router):
16×
901
        "ICircuitListener API"
902
        txtorlog.msg("circuit_extend:", circuit.id, router)
16×
903

904
    def circuit_built(self, circuit):
16×
905
        "ICircuitListener API"
906
        txtorlog.msg(
16×
907
            "circuit_built:", circuit.id,
908
            "->".join("%s.%s" % (x.name, x.location.countrycode) for x in circuit.path),
909
            circuit.streams
910
        )
911

912
    def circuit_new(self, circuit):
16×
913
        "ICircuitListener API"
914
        txtorlog.msg("circuit_new:", circuit.id)
16×
915
        self.circuits[circuit.id] = circuit
16×
916

917
    def circuit_destroy(self, circuit):
16×
918
        "Used by circuit_closed and circuit_failed (below)"
919
        txtorlog.msg("circuit_destroy:", circuit.id)
16×
920
        circuit._when_built.fire(
16×
921
            Failure(Exception("Destroying circuit; will never hit BUILT"))
922
        )
923
        del self.circuits[circuit.id]
16×
924

925
    def circuit_closed(self, circuit, **kw):
16×
926
        "ICircuitListener API"
927
        txtorlog.msg("circuit_closed", circuit)
16×
928
        circuit._when_built.fire(
16×
929
            Failure(
930
                CircuitBuildClosedError(_extract_reason(kw))
931
            )
932
        )
933
        self.circuit_destroy(circuit)
16×
934

935
    def circuit_failed(self, circuit, **kw):
16×
936
        "ICircuitListener API"
937
        txtorlog.msg("circuit_failed", circuit, str(kw))
16×
938
        circuit._when_built.fire(
16×
939
            Failure(
940
                CircuitBuildFailedError(_extract_reason(kw))
941
            )
942
        )
943
        self.circuit_destroy(circuit)
16×
944

945

946
class CircuitBuildFailedError(Exception):
16×
947
    """
948
    This exception is thrown when a circuit we're building fails
949
    """
950
    def __init__(self, reason):
16×
951
        self.reason = reason
16×
952
        super(CircuitBuildFailedError, self).__init__(
16×
953
            "Circuit failed: {}".format(
954
                self.reason,
955
            )
956
        )
957

958

959
class CircuitBuildClosedError(Exception):
16×
960
    """
961
    This exception is thrown when a circuit we're building is closed
962
    """
963
    def __init__(self, reason):
16×
964
        self.reason = reason
16×
965
        super(CircuitBuildClosedError, self).__init__(
16×
966
            "Circuit closed: {}".format(
967
                self.reason,
968
            )
969
        )
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2022 Coveralls, Inc