• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hivesolutions / netius / 4006

pending completion
4006

push

travis-ci-com

joamag
chore: added ignore settings

6844 of 15290 relevant lines covered (44.76%)

3.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.62
/src/netius/common/http2.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3

4
# Hive Netius System
5
# Copyright (c) 2008-2018 Hive Solutions Lda.
6
#
7
# This file is part of Hive Netius System.
8
#
9
# Hive Netius System is free software: you can redistribute it and/or modify
10
# it under the terms of the Apache License as published by the Apache
11
# Foundation, either version 2.0 of the License, or (at your option) any
12
# later version.
13
#
14
# Hive Netius System is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
# Apache License for more details.
18
#
19
# You should have received a copy of the Apache License along with
20
# Hive Netius System. If not, see <http://www.apache.org/licenses/>.
21

22
__author__ = "João Magalhães <joamag@hive.pt>"
8✔
23
""" The author(s) of the module """
24

25
__version__ = "1.0.0"
8✔
26
""" The version of the module """
27

28
__revision__ = "$LastChangedRevision$"
8✔
29
""" The revision number of the module """
30

31
__date__ = "$LastChangedDate$"
8✔
32
""" The last change date of the module """
33

34
__copyright__ = "Copyright (c) 2008-2018 Hive Solutions Lda."
8✔
35
""" The copyright for the module """
36

37
__license__ = "Apache License, Version 2.0"
8✔
38
""" The license for the module """
39

40
import struct
8✔
41
import tempfile
8✔
42
import contextlib
8✔
43

44
import netius
8✔
45

46
from . import http
8✔
47
from . import util
8✔
48
from . import parser
8✔
49

50
HEADER_SIZE = 9
8✔
51

52
SETTING_SIZE = 6
8✔
53

54
DATA = 0x00
8✔
55
HEADERS = 0x01
8✔
56
PRIORITY = 0x02
8✔
57
RST_STREAM = 0x03
8✔
58
SETTINGS = 0x04
8✔
59
PUSH_PROMISE = 0x05
8✔
60
PING = 0x06
8✔
61
GOAWAY = 0x07
8✔
62
WINDOW_UPDATE = 0x08
8✔
63
CONTINUATION = 0x09
8✔
64

65
PROTOCOL_ERROR = 0x01
8✔
66
INTERNAL_ERROR = 0x02
8✔
67
FLOW_CONTROL_ERROR = 0x03
8✔
68
SETTINGS_TIMEOUT = 0x04
8✔
69
STREAM_CLOSED = 0x05
8✔
70
FRAME_SIZE_ERROR = 0x06
8✔
71
REFUSED_STREAM = 0x07
8✔
72
CANCEL = 0x08
8✔
73
COMPRESSION_ERROR = 0x09
8✔
74
CONNECT_ERROR = 0x0a
8✔
75
ENHANCE_YOUR_CALM = 0x0b
8✔
76
INADEQUATE_SECURITY = 0x0c
8✔
77
HTTP_1_1_REQUIRED = 0x0d
8✔
78

79
SETTINGS_HEADER_TABLE_SIZE = 0x01
8✔
80
SETTINGS_ENABLE_PUSH = 0x02
8✔
81
SETTINGS_MAX_CONCURRENT_STREAMS = 0x03
8✔
82
SETTINGS_INITIAL_WINDOW_SIZE = 0x04
8✔
83
SETTINGS_MAX_FRAME_SIZE = 0x05
8✔
84
SETTINGS_MAX_HEADER_LIST_SIZE = 0x06
8✔
85

86
HTTP_20 = 4
8✔
87
""" The newly created version of the protocol, note that
88
this constant value should be created in away that its value
89
is superior to the ones defined for previous versions """
90

91
HEADER_STATE = 1
8✔
92
""" The initial header state for which the header
93
of the frame is going to be parsed and loaded """
94

95
PAYLOAD_STATE = 2
8✔
96
""" The second state of the frame parsing where the
97
payload of the frame is going to be loaded """
98

99
FINISH_STATE = 3
8✔
100
""" The final finish state to be used when the parsing
101
of the frame has been finished """
102

103
HTTP2_WINDOW = 65535
8✔
104
""" The default/initial size of the window used for the
105
flow control of both connections and streams """
106

107
HTTP2_FRAME_SIZE = 16384
8✔
108
""" The base default value for the maximum size allowed
109
from the frame, this includes the header value """
110

111
HTTP2_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
8✔
112
""" The preface string to be sent by the client upon
113
the establishment of the connection """
114

115
HTTP2_PSEUDO = (":method", ":scheme", ":path", ":authority", ":status")
8✔
116
""" The complete set of HTTP 2 based pseudo-header values
117
this list should be inclusive and limited """
118

119
HTTP2_TUPLES = (
8✔
120
    (SETTINGS_HEADER_TABLE_SIZE, "SETTINGS_HEADER_TABLE_SIZE"),
121
    (SETTINGS_ENABLE_PUSH, "SETTINGS_ENABLE_PUSH"),
122
    (SETTINGS_MAX_CONCURRENT_STREAMS, "SETTINGS_MAX_CONCURRENT_STREAMS"),
123
    (SETTINGS_INITIAL_WINDOW_SIZE, "SETTINGS_INITIAL_WINDOW_SIZE"),
124
    (SETTINGS_MAX_FRAME_SIZE, "SETTINGS_MAX_FRAME_SIZE"),
125
    (SETTINGS_MAX_HEADER_LIST_SIZE, "SETTINGS_MAX_HEADER_LIST_SIZE")
126
)
127
""" The sequence of tuple that associate the constant value of the
128
setting with the proper string representation for it """
129

130
HTTP2_NAMES = {
8✔
131
    DATA : "DATA",
132
    HEADERS : "HEADERS",
133
    PRIORITY : "PRIORITY",
134
    RST_STREAM : "RST_STREAM",
135
    SETTINGS : "SETTINGS",
136
    PUSH_PROMISE : "PUSH_PROMISE",
137
    PING : "PING",
138
    GOAWAY : "GOAWAY",
139
    WINDOW_UPDATE : "WINDOW_UPDATE",
140
    CONTINUATION : "CONTINUATION"
141
}
142
""" The association between the various types of frames
143
described as integers and their representation as strings """
144

145
HTTP2_SETTINGS = {
8✔
146
    SETTINGS_HEADER_TABLE_SIZE : 4096,
147
    SETTINGS_ENABLE_PUSH : 1,
148
    SETTINGS_MAX_CONCURRENT_STREAMS : 128,
149
    SETTINGS_INITIAL_WINDOW_SIZE : 65535,
150
    SETTINGS_MAX_FRAME_SIZE : 16384,
151
    SETTINGS_MAX_HEADER_LIST_SIZE : 16384
152
}
153
""" The default values to be used for settings of a newly
154
created connection, this should be defined according to specification """
155

156
HTTP2_SETTINGS_OPTIMAL = {
8✔
157
    SETTINGS_HEADER_TABLE_SIZE : 4096,
158
    SETTINGS_MAX_CONCURRENT_STREAMS : 512,
159
    SETTINGS_INITIAL_WINDOW_SIZE : 1048576,
160
    SETTINGS_MAX_FRAME_SIZE : 131072,
161
    SETTINGS_MAX_HEADER_LIST_SIZE : 16384
162
}
163
""" The optimal settings meant to be used by an infra-structure
164
deployed in a production environment """
165

166
HTTP2_SETTINGS_T = netius.legacy.items(HTTP2_SETTINGS)
8✔
167
""" The tuple sequence version of the settings defaults """
168

169
HTTP2_SETTINGS_OPTIMAL_T = netius.legacy.items(HTTP2_SETTINGS_OPTIMAL)
8✔
170
""" The tuple sequence version of the settings optimal """
171

172
class HTTP2Parser(parser.Parser):
8✔
173

174
    FIELDS = (
8✔
175
        "_pid",
176
        "store",
177
        "file_limit",
178
        "state",
179
        "keep_alive",
180
        "length",
181
        "type",
182
        "flags",
183
        "stream",
184
        "end_headers",
185
        "last_type",
186
        "last_stream",
187
        "last_end_headers"
188
    )
189

190
    def __init__(
8✔
191
        self,
192
        owner,
193
        store = False,
194
        file_limit = http.FILE_LIMIT
195
    ):
196
        parser.Parser.__init__(self, owner)
×
197

198
        self.build()
×
199
        self.reset(
×
200
            store = store,
201
            file_limit = file_limit
202
        )
203

204
    def build(self):
8✔
205
        """
206
        Builds the initial set of states ordered according to
207
        their internal integer definitions, this method provides
208
        a fast and scalable way of parsing data.
209
        """
210

211
        parser.Parser.build(self)
×
212

213
        self.connection = self.owner
×
214

215
        self.states = (
×
216
            self._parse_header,
217
            self._parse_payload
218
        )
219
        self.state_l = len(self.states)
×
220

221
        self.parsers = (
×
222
            self._parse_data,
223
            self._parse_headers,
224
            self._parse_priority,
225
            self._parse_rst_stream,
226
            self._parse_settings,
227
            self._parse_push_promise,
228
            self._parse_ping,
229
            self._parse_goaway,
230
            self._parse_window_update,
231
            self._parse_continuation
232
        )
233

234
        self.streams = {}
×
235
        self._max_stream = 0
×
236
        self._encoder = None
×
237
        self._decoder = None
×
238

239
    def destroy(self):
8✔
240
        """
241
        Destroys the current structure for the parser meaning that
242
        it's restored to the original values, this method should only
243
        be called on situation where no more parser usage is required.
244
        """
245

246
        parser.Parser.destroy(self)
×
247

248
        # iterates over the complete set of associated streams to close
249
        # them as the parser is now going to be destroyed and they cannot
250
        # be reached any longer (invalidated state)
251
        streams = netius.legacy.values(self.streams)
×
252
        for stream in streams: stream.close()
×
253

254
        self.connection = None
×
255
        self.states = ()
×
256
        self.state_l = 0
×
257
        self.parsers = ()
×
258
        self.streams = {}
×
259
        self._max_stream = 0
×
260
        self._encoder = None
×
261
        self._decoder = None
×
262

263
    def info_dict(self):
8✔
264
        info = parser.Parser.info_dict(self)
×
265
        info.update(
×
266
            streams = self.info_streams()
267
        )
268
        return info
×
269

270
    def info_streams(self):
8✔
271
        info = []
×
272
        keys = netius.legacy.keys(self.streams)
×
273
        keys.sort()
×
274
        for stream in keys:
×
275
            stream = self.streams[stream]
×
276
            item = stream.info_dict()
×
277
            info.append(item)
×
278
        return info
×
279

280
    def reset(
8✔
281
        self,
282
        store = False,
283
        file_limit = http.FILE_LIMIT
284
    ):
285
        self.store = store
×
286
        self.file_limit = file_limit
×
287
        self.state = HEADER_STATE
×
288
        self.buffer = []
×
289
        self.keep_alive = True
×
290
        self.payload = None
×
291
        self.length = 0
×
292
        self.type = 0
×
293
        self.flags = 0
×
294
        self.stream = 0
×
295
        self.stream_o = None
×
296
        self.end_headers = False
×
297
        self.last_type = 0
×
298
        self.last_stream = 0
×
299
        self.last_end_headers = False
×
300

301
    def clear(self, force = False, save = True):
8✔
302
        if not force and self.state == HEADER_STATE: return
×
303
        type = self.type
×
304
        stream = self.stream
×
305
        end_headers = self.end_headers
×
306
        self.reset(
×
307
            store = self.store,
308
            file_limit = self.file_limit
309
        )
310
        if not save: return
×
311
        self.last_type = type
×
312
        self.last_stream = stream
×
313
        self.last_end_headers = end_headers
×
314

315
    def close(self):
8✔
316
        pass
×
317

318
    def parse(self, data):
8✔
319
        """
320
        Parses the provided data chunk, changing the current
321
        state of the parser accordingly and returning the
322
        number of processed bytes from it.
323

324
        :type data: String
325
        :param data: The string containing the data to be parsed
326
        in the current parse operation.
327
        :rtype: int
328
        :return: The amount of bytes of the data string that have
329
        been "parsed" in the current parse operation.
330
        """
331

332
        parser.Parser.parse(self, data)
×
333

334
        # in case the current state of the parser is finished, must
335
        # reset the state to the start position as the parser is
336
        # re-starting (probably a new data sequence)
337
        if self.state == FINISH_STATE: self.clear()
×
338

339
        # retrieves the size of the data that has been sent for parsing
340
        # and saves it under the size original variable
341
        size = len(data)
×
342
        size_o = size
×
343

344
        # iterates continuously to try to process all that
345
        # data that has been sent for processing
346
        while size > 0:
×
347

348
            if self.state <= self.state_l:
×
349
                method = self.states[self.state - 1]
×
350
                count = method(data)
×
351
                if count == -1: break
×
352
                if count == 0: continue
×
353

354
                size -= count
×
355
                data = data[count:]
×
356

357
                continue
×
358

359
            elif self.state == FINISH_STATE:
×
360
                self.clear()
×
361

362
                continue
×
363

364
            else:
365
                raise netius.ParserError("Invalid state '%d'" % self.state)
×
366

367
        # in case not all of the data has been processed
368
        # must add it to the buffer so that it may be used
369
        # latter in the next parsing of the message
370
        if size > 0: self.buffer.append(data)
×
371

372
        # returns the number of read (processed) bytes of the
373
        # data that has been sent to the parser
374
        return size_o - size
×
375

376
    def get_type_s(self, type):
8✔
377
        """
378
        Retrieves the string based representation of the frame
379
        type according to the HTTP2 specification.
380

381
        :type type: int
382
        :param type: The frame type as an integer that is going
383
        to be converted to the string representation.
384
        :rtype: String
385
        :return: The string based representation of the frame type.
386
        """
387

388
        return HTTP2_NAMES.get(type, None)
×
389

390
    def assert_header(self):
8✔
391
        """
392
        Runs a series of assertion operations related with the
393
        header of the frame, making sure it remains compliant
394
        with the HTTP 2 specification.
395
        """
396

397
        if self.length > self.owner.settings[SETTINGS_MAX_FRAME_SIZE]:
×
398
            raise netius.ParserError(
×
399
                "Headers are greater than SETTINGS_MAX_FRAME_SIZE",
400
                stream = self.stream,
401
                error_code = FRAME_SIZE_ERROR
402
            )
403
        if self.last_type in (HEADERS, CONTINUATION) and not\
×
404
            self.last_end_headers and not self.last_stream == self.stream:
405
            raise netius.ParserError(
×
406
                "Cannot send frame from a different stream in middle of headers",
407
                error_code = PROTOCOL_ERROR
408
            )
409

410
    def assert_stream(self, stream):
8✔
411
        if not stream.identifier % 2 == 1:
×
412
            raise netius.ParserError(
×
413
                "Stream identifiers must be odd",
414
                error_code = PROTOCOL_ERROR
415
            )
416
        if stream.dependency == stream.identifier:
×
417
            raise netius.ParserError(
×
418
                "Stream cannot depend on itself",
419
                error_code = PROTOCOL_ERROR
420
            )
421
        if len(self.streams) >= self.owner.settings[SETTINGS_MAX_CONCURRENT_STREAMS]:
×
422
            raise netius.ParserError(
×
423
                "Too many streams (greater than SETTINGS_MAX_CONCURRENT_STREAMS)",
424
                stream = self.stream,
425
                error_code = PROTOCOL_ERROR
426
            )
427

428
    def assert_data(self, stream, end_stream):
8✔
429
        if self.stream == 0x00:
×
430
            raise netius.ParserError(
×
431
                "Stream cannot be set to 0x00 for DATA",
432
                error_code = PROTOCOL_ERROR
433
            )
434
        if not stream.end_headers:
×
435
            raise netius.ParserError(
×
436
                "Not ready to receive DATA open",
437
                stream = self.stream,
438
                error_code = PROTOCOL_ERROR
439
            )
440
        if stream.end_stream and stream.end_headers:
×
441
            raise netius.ParserError(
×
442
                "Not ready to receive DATA half closed (remote)",
443
                stream = self.stream,
444
                error_code = STREAM_CLOSED
445
            )
446

447
    def assert_headers(self, stream, end_stream):
8✔
448
        if stream.end_stream and stream.end_headers:
×
449
            raise netius.ParserError(
×
450
                "Not ready to receive HEADERS half closed (remote)",
451
                stream = self.stream,
452
                error_code = STREAM_CLOSED
453
            )
454
        if not end_stream:
×
455
            raise netius.ParserError(
×
456
                "Second HEADERS without END_STREAM flag",
457
                stream = self.stream,
458
                error_code = PROTOCOL_ERROR
459
            )
460

461
    def assert_priority(self, stream, dependency):
8✔
462
        if self.stream == 0x00:
×
463
            raise netius.ParserError(
×
464
                "Stream cannot be set to 0x00 for PRIORITY",
465
                error_code = PROTOCOL_ERROR
466
            )
467
        if dependency == self.stream:
×
468
            raise netius.ParserError(
×
469
                "Stream cannot depend on current stream",
470
                error_code = PROTOCOL_ERROR
471
            )
472
        if stream and dependency == stream.identifier:
×
473
            raise netius.ParserError(
×
474
                "Stream cannot depend on itself",
475
                error_code = PROTOCOL_ERROR
476
            )
477

478
    def assert_rst_stream(self, stream):
8✔
479
        if self.stream == 0x00:
×
480
            raise netius.ParserError(
×
481
                "Stream cannot be set to 0x00 for RST_STREAM",
482
                error_code = PROTOCOL_ERROR
483
            )
484
        if self.stream > self._max_stream:
×
485
            raise netius.ParserError(
×
486
                "Stream has not been created for RST_STREAM",
487
                error_code = PROTOCOL_ERROR
488
            )
489

490
    def assert_settings(self, settings, ack, extended = True):
8✔
491
        if not self.stream == 0x00:
×
492
            raise netius.ParserError(
×
493
                "Stream must be set to 0x00 for SETTINGS",
494
                error_code = PROTOCOL_ERROR
495
            )
496
        if ack and not self.length == 0:
×
497
            raise netius.ParserError(
×
498
                "SETTINGS with ACK must be zero length",
499
                error_code = FRAME_SIZE_ERROR
500
            )
501
        if not self.length % 6 == 0:
×
502
            raise netius.ParserError(
×
503
                "Size of SETTINGS frame must be a multiple of 6",
504
                error_code = FRAME_SIZE_ERROR
505
            )
506
        if not extended: return
×
507
        settings = dict(settings)
×
508
        if not settings.get(SETTINGS_ENABLE_PUSH, 0) in (0, 1):
×
509
            raise netius.ParserError(
×
510
                "Value of SETTINGS_ENABLE_PUSH different from 0 or 1",
511
                error_code = PROTOCOL_ERROR
512
            )
513
        if settings.get(SETTINGS_INITIAL_WINDOW_SIZE, 0) > 2147483647:
×
514
            raise netius.ParserError(
×
515
                "Value of SETTINGS_INITIAL_WINDOW_SIZE too large",
516
                error_code = FLOW_CONTROL_ERROR
517
            )
518
        if settings.get(SETTINGS_MAX_FRAME_SIZE, 16384) < 16384:
×
519
            raise netius.ParserError(
×
520
                "Value of SETTINGS_MAX_FRAME_SIZE too small",
521
                error_code = PROTOCOL_ERROR
522
            )
523
        if settings.get(SETTINGS_MAX_FRAME_SIZE, 16384) > 16777215:
×
524
            raise netius.ParserError(
×
525
                "Value of SETTINGS_MAX_FRAME_SIZE too large",
526
                error_code = PROTOCOL_ERROR
527
            )
528

529
    def assert_push_promise(self, promised_stream):
8✔
530
        raise netius.ParserError(
×
531
            "PUSH_PROMISE not allowed for server",
532
            error_code = PROTOCOL_ERROR
533
        )
534

535
    def assert_ping(self):
8✔
536
        if not self.stream == 0x00:
×
537
            raise netius.ParserError(
×
538
                "Stream must be set to 0x00 for PING",
539
                error_code = PROTOCOL_ERROR
540
            )
541
        if not self.length == 8:
×
542
            raise netius.ParserError(
×
543
                "Size of PING frame must be 8",
544
                error_code = FRAME_SIZE_ERROR
545
            )
546

547
    def assert_goaway(self):
8✔
548
        if not self.stream == 0x00:
×
549
            raise netius.ParserError(
×
550
                "Stream must be set to 0x00 for GOAWAY",
551
                error_code = PROTOCOL_ERROR
552
            )
553

554
    def assert_window_update(self, stream, increment):
8✔
555
        if increment == 0:
×
556
            raise netius.ParserError(
×
557
                "WINDOW_UPDATE increment must not be zero",
558
                error_code = PROTOCOL_ERROR
559
            )
560
        if self.owner.window + increment > 2147483647:
×
561
            raise netius.ParserError(
×
562
                "Window value for the connection too large",
563
                error_code = FLOW_CONTROL_ERROR
564
            )
565
        if stream and stream.window + increment > 2147483647:
×
566
            raise netius.ParserError(
×
567
                "Window value for the stream too large",
568
                error_code = FLOW_CONTROL_ERROR
569
            )
570

571
    def assert_continuation(self, stream):
8✔
572
        if stream.end_stream and stream.end_headers:
×
573
            raise netius.ParserError(
×
574
                "Not ready to receive CONTINUATION half closed (remote)",
575
                stream = self.stream,
576
                error_code = PROTOCOL_ERROR
577
            )
578
        if not self.last_type in (HEADERS, PUSH_PROMISE, CONTINUATION):
×
579
            raise netius.ParserError(
×
580
                "CONTINUATION without HEADERS, PUSH_PROMISE or CONTINUATION before",
581
                error_code = PROTOCOL_ERROR
582
            )
583

584
    @property
8✔
585
    def type_s(self):
586
        return self.get_type_s(self.type)
×
587

588
    def _parse_header(self, data):
8✔
589
        if len(data) + self.buffer_size < HEADER_SIZE: return -1
×
590

591
        size = HEADER_SIZE - self.buffer_size
×
592
        data = self.buffer_data + data[:size]
×
593

594
        header = struct.unpack("!BHBBI", data)
×
595
        extra, self.length, self.type, self.flags, self.stream = header
×
596
        self.length += extra << 16
×
597

598
        self.assert_header()
×
599

600
        self.state = PAYLOAD_STATE
×
601
        self.trigger("on_header", header)
×
602

603
        return size
×
604

605
    def _parse_payload(self, data):
8✔
606
        if len(data) + self.buffer_size < self.length: return -1
×
607

608
        size = self.length - self.buffer_size
×
609
        data = self.buffer_data + data[:size]
×
610

611
        valid_type = self.type < len(self.parsers)
×
612
        if not valid_type: self._invalid_type()
×
613

614
        self.payload = data
×
615
        self.trigger("on_payload")
×
616

617
        parse_method = self.parsers[self.type]
×
618
        parse_method(data)
×
619

620
        self.state = FINISH_STATE
×
621
        self.trigger("on_frame")
×
622

623
        return size
×
624

625
    def _parse_data(self, data):
8✔
626
        data_l = len(data)
×
627

628
        end_stream = True if self.flags & 0x01 else False
×
629
        padded = self.flags & 0x08
×
630

631
        index = 0
×
632
        padded_l = 0
×
633

634
        if padded:
×
635
            padded_l, = struct.unpack("!B", data[index:index + 1])
×
636
            index += 1
×
637

638
        contents = data[index:data_l - padded_l]
×
639

640
        stream = self._get_stream(self.stream)
×
641
        self.assert_data(stream, end_stream)
×
642

643
        stream.extend_data(contents)
×
644
        stream.end_stream = end_stream
×
645

646
        self.trigger("on_data_h2", stream, contents)
×
647

648
        self.trigger("on_partial", contents)
×
649
        if stream.is_ready: self.trigger("on_data")
×
650

651
    def _parse_headers(self, data):
8✔
652
        data_l = len(data)
×
653

654
        end_stream = True if self.flags & 0x01 else False
×
655
        end_headers = True if self.flags & 0x04 else False
×
656
        padded = self.flags & 0x08
×
657
        priority = self.flags & 0x20
×
658

659
        index = 0
×
660
        padded_l = 0
×
661
        dependency = 0
×
662
        weight = 0
×
663
        exclusive = 0
×
664

665
        if padded:
×
666
            padded_l, = struct.unpack("!B", data[index:index + 1])
×
667
            index += 1
×
668

669
        if priority:
×
670
            dependency, weight = struct.unpack("!IB", data[index:index + 5])
×
671
            exclusive = True if dependency & 0x80000000 else False
×
672
            dependency = dependency & 0x7fffffff
×
673
            index += 5
×
674

675
        # retrieves the (headers) fragment part of the payload, this is
676
        # going to be used as the basis for the header decoding
677
        fragment = data[index:data_l - padded_l]
×
678

679
        # retrieves the value of the window initial size from the owner
680
        # connection this is the value to be set in the new stream and
681
        # then retrieves the (maximum) frame size allowed to be passed
682
        # to the new stream instance for proper data frame fragmentation
683
        # these values are associated with the remote peer settings
684
        window = self.owner.settings_r[SETTINGS_INITIAL_WINDOW_SIZE]
×
685
        frame_size = self.owner.settings_r[SETTINGS_MAX_FRAME_SIZE]
×
686

687
        # tries to retrieve a previously opened stream and, this may be
688
        # the case it has been opened by a previous frame operation
689
        stream = self._get_stream(self.stream, strict = False, closed_s = True)
×
690

691
        if stream:
×
692
            # runs the headers assertion operation and then updated the
693
            # various elements in the currently opened stream accordingly
694
            self.assert_headers(stream, end_stream)
×
695
            stream.extend_headers(fragment)
×
696
            if dependency: stream.dependency = dependency
×
697
            if weight: stream.weight = weight
×
698
            if exclusive: stream.exclusive = exclusive
×
699
            if end_headers: stream.end_headers = end_headers
×
700
            if end_stream: stream.end_stream = end_stream
×
701
        else:
702
            # constructs the stream structure for the current stream that
703
            # is being open/created using the current owner, headers and
704
            # other information as the basis for such construction
705
            stream = HTTP2Stream(
×
706
                owner = self,
707
                identifier = self.stream,
708
                header_b = fragment,
709
                dependency = dependency,
710
                weight = weight,
711
                exclusive = exclusive,
712
                end_headers = end_headers,
713
                end_stream = end_stream,
714
                store = self.store,
715
                file_limit = self.file_limit,
716
                window = window,
717
                frame_size = frame_size
718
            )
719

720
            # ensures that the stream object is properly open, this should
721
            # enable to stream to start performing operations
722
            stream.open()
×
723

724
        # updates the current parser value for the end headers flag
725
        # this is going to be used to determine if the current state
726
        # of the connection is (loading/parsing) headers
727
        self.end_headers = end_headers
×
728

729
        # runs the assertion for the new stream that has been created
730
        # it must be correctly validation for some of its values
731
        self.assert_stream(stream)
×
732

733
        # sets the stream under the current parser meaning that it can
734
        # be latter retrieved for proper event propagation
735
        self._set_stream(stream)
×
736

737
        self.trigger("on_headers_h2", stream)
×
738

739
        if stream.end_headers: stream._calculate()
×
740
        if stream.end_headers: self.trigger("on_headers")
×
741
        if stream.is_ready: self.trigger("on_data")
×
742

743
    def _parse_priority(self, data):
8✔
744
        dependency, weight = struct.unpack("!IB", data)
×
745
        stream = self._get_stream(self.stream, strict = False)
×
746
        if stream:
×
747
            stream.dependency = dependency
×
748
            stream.weight = weight
×
749
        self.assert_priority(stream, dependency)
×
750
        self.trigger("on_priority", stream, dependency, weight)
×
751

752
    def _parse_rst_stream(self, data):
8✔
753
        error_code, = struct.unpack("!I", data)
×
754
        stream = self._get_stream(self.stream, strict = False)
×
755
        self.assert_rst_stream(stream)
×
756
        self.trigger("on_rst_stream", stream, error_code)
×
757

758
    def _parse_settings(self, data):
8✔
759
        settings = []
×
760
        count = self.length // SETTING_SIZE
×
761

762
        ack = self.flags & 0x01
×
763

764
        for index in netius.legacy.xrange(count):
×
765
            base = index * SETTING_SIZE
×
766
            part = data[base:base + SETTING_SIZE]
×
767
            setting = struct.unpack("!HI", part)
×
768
            settings.append(setting)
×
769

770
        self.assert_settings(settings, ack)
×
771

772
        self.trigger("on_settings", settings, ack)
×
773

774
    def _parse_push_promise(self, data):
8✔
775
        data_l = len(data)
×
776

777
        end_headers = True if self.flags & 0x04 else False
×
778
        padded = self.flags & 0x08
×
779

780
        index = 0
×
781
        padded_l = 0
×
782

783
        if padded:
×
784
            padded_l, = struct.unpack("!B", data[index:index + 1])
×
785
            index += 1
×
786

787
        promised_stream, = struct.unpack("!I", data[index:index + 4])
×
788

789
        fragment = data[index:data_l - padded_l]
×
790

791
        self.assert_push_promise(promised_stream)
×
792

793
        self.trigger("on_push_promise", promised_stream, fragment, end_headers)
×
794

795
    def _parse_ping(self, data):
8✔
796
        ack = self.flags & 0x01
×
797
        self.assert_ping()
×
798
        self.trigger("on_ping", data, ack)
×
799

800
    def _parse_goaway(self, data):
8✔
801
        last_stream, error_code = struct.unpack("!II", data[:8])
×
802
        extra = data[8:]
×
803
        self.assert_goaway()
×
804
        self.trigger("on_goaway", last_stream, error_code, extra)
×
805

806
    def _parse_window_update(self, data):
8✔
807
        increment, = struct.unpack("!I", data)
×
808
        stream = self._get_stream(
×
809
            self.stream,
810
            strict = False,
811
            unopened_s = True
812
        )
813
        self.assert_window_update(stream, increment)
×
814
        if self.stream and not stream: return
×
815
        self.trigger("on_window_update", stream, increment)
×
816

817
    def _parse_continuation(self, data):
8✔
818
        end_headers = True if self.flags & 0x04 else False
×
819

820
        stream = self._get_stream(self.stream)
×
821
        self.assert_continuation(stream)
×
822

823
        stream.extend_headers(data)
×
824
        stream.end_headers = end_headers
×
825
        self.end_headers = end_headers
×
826

827
        stream.decode_headers()
×
828

829
        self.trigger("on_continuation", stream)
×
830

831
        if stream.end_headers: stream._calculate()
×
832
        if stream.end_headers: self.trigger("on_headers")
×
833
        if stream.end_headers and stream.end_stream:
×
834
            self.trigger("on_data")
×
835

836
    def _has_stream(self, stream):
8✔
837
        return stream in self.streams
×
838

839
    def _get_stream(
8✔
840
        self,
841
        stream = None,
842
        default = None,
843
        strict = True,
844
        closed_s = False,
845
        unopened_s = False,
846
        exists_s = False
847
    ):
848
        if stream == None: stream = self.stream
×
849
        if stream == 0: return default
×
850
        if strict: closed_s = True; unopened_s = True; exists_s = True
×
851
        exists = stream in self.streams
×
852
        if closed_s and not exists and stream <= self._max_stream:
×
853
            raise netius.ParserError(
×
854
                "Invalid or closed stream '%d'" % stream,
855
                stream = self.stream,
856
                error_code = STREAM_CLOSED
857
            )
858
        if unopened_s and not exists and stream > self._max_stream:
×
859
            raise netius.ParserError(
×
860
                "Invalid or unopened stream '%d'" % stream,
861
                stream = self.stream,
862
                error_code = PROTOCOL_ERROR
863
            )
864
        if exists_s and not exists:
×
865
            raise netius.ParserError(
×
866
                "Invalid stream '%d'" % stream,
867
                stream = self.stream,
868
                error_code = PROTOCOL_ERROR
869
            )
870
        self.stream_o = self.streams.get(stream, default)
×
871
        return self.stream_o
×
872

873
    def _set_stream(self, stream):
8✔
874
        self.streams[stream.identifier] = stream
×
875
        self.stream_o = stream
×
876
        self._max_stream = max(self._max_stream, stream.identifier)
×
877

878
    def _del_stream(self, stream):
8✔
879
        if not stream in self.streams: return
×
880
        del self.streams[stream]
×
881
        self.stream_o = None
×
882

883
    def _invalid_type(self):
8✔
884
        ignore = False if self.last_type == HEADERS else True
×
885
        if ignore: raise netius.ParserError("Invalid frame type", ignore = True)
×
886
        raise netius.ParserError("Invalid frame type", error_code = PROTOCOL_ERROR)
×
887

888
    @property
8✔
889
    def buffer_size(self):
890
        return sum(len(data) for data in self.buffer)
×
891

892
    @property
8✔
893
    def buffer_data(self, empty = True):
8✔
894
        data = b"".join(self.buffer)
×
895
        if empty: del self.buffer[:]
×
896
        return data
×
897

898
    @property
8✔
899
    def encoder(self):
900
        if self._encoder: return self._encoder
×
901
        import hpack
×
902
        self._encoder = hpack.hpack.Encoder()
×
903
        return self._encoder
×
904

905
    @property
8✔
906
    def decoder(self):
907
        if self._decoder: return self._decoder
×
908
        import hpack
×
909
        self._decoder = hpack.hpack.Decoder()
×
910
        return self._decoder
×
911

912
class HTTP2Stream(netius.Stream):
8✔
913
    """
914
    Object representing a stream of data interchanged between two
915
    peers under the HTTP 2 protocol.
916

917
    A stream may be considered a node in a tree of dependencies,
918
    the children references are stored on the parent node.
919

920
    Should be compatible with both the parser and the connection
921
    interfaces and may be used for both types of operations.
922

923
    :see: https://tools.ietf.org/html/rfc7540
924
    """
925

926
    def __init__(
8✔
927
        self,
928
        identifier = None,
929
        header_b = None,
930
        dependency = 0x00,
931
        weight = 1,
932
        exclusive = False,
933
        end_headers = False,
934
        end_stream = False,
935
        end_stream_l = False,
936
        store = False,
937
        file_limit = http.FILE_LIMIT,
938
        window = HTTP2_WINDOW,
939
        frame_size = HTTP2_FRAME_SIZE,
940
        *args,
941
        **kwargs
942
    ):
943
        netius.Stream.__init__(self, *args, **kwargs)
×
944
        self.identifier = identifier
×
945
        self.header_b = [header_b]
×
946
        self.dependency = dependency
×
947
        self.weight = weight
×
948
        self.exclusive = exclusive
×
949
        self.end_headers = end_headers
×
950
        self.end_stream = end_stream
×
951
        self.end_stream_l = end_stream_l
×
952
        self.reset(
×
953
            store = store,
954
            file_limit = file_limit,
955
            window = window,
956
            frame_size = frame_size
957
        )
958

959
    def __getattr__(self, name):
8✔
960
        if hasattr(self.connection, name):
×
961
            return getattr(self.connection, name)
×
962
        raise AttributeError("'%s' not found" % name)
×
963

964
    def reset(
8✔
965
        self,
966
        store = False,
967
        file_limit = http.FILE_LIMIT,
968
        window = HTTP2_WINDOW,
969
        frame_size = HTTP2_FRAME_SIZE
970
    ):
971
        netius.Stream.reset(self)
×
972
        self.store = store
×
973
        self.file_limit = file_limit
×
974
        self.window = window
×
975
        self.window_m = min(self.window, frame_size - HEADER_SIZE)
×
976
        self.window_o = self.connection.window_o
×
977
        self.window_l = self.window_o
×
978
        self.window_t = self.window_o // 2
×
979
        self.pending_s = 0
×
980
        self.headers = None
×
981
        self.headers_l = None
×
982
        self.method = None
×
983
        self.path_s = None
×
984
        self.version = HTTP_20
×
985
        self.version_s = "HTTP/2.0"
×
986
        self.encodings = None
×
987
        self.chunked = False
×
988
        self.keep_alive = True
×
989
        self.content_l = -1
×
990
        self.frames = 0
×
991
        self._available = True
×
992
        self._data_b = None
×
993
        self._data_l = -1
×
994

995
    def open(self):
8✔
996
        # check if the current stream is currently in (already) in
997
        # the open state and if that's the case returns immediately
998
        if self.status == netius.OPEN: return
×
999

1000
        # calls the parent open operation for upper operations, this
1001
        # should take care of some callback calling
1002
        netius.Stream.open(self)
×
1003

1004
        # runs the decoding of the headers, note that this is just a
1005
        # try-out operation and may fail if the complete set of header
1006
        # data is not currently available (continuation frames pending)
1007
        self.decode_headers()
×
1008

1009
    def close(self, flush = False, destroy = True, reset = True):
8✔
1010
        # verifies if the current stream is already closed and
1011
        # if that's the case returns immediately, avoiding duplicate
1012
        if self.status == netius.CLOSED: return
×
1013

1014
        # in case the reset flag is set sends the final, tries to determine
1015
        # the way of reseting the stream, in case the flush flag is set
1016
        # (meaning that a less strict closing is requested) and the current
1017
        # stream is considered ready for request handling the stream reset
1018
        # operation consists of a final chunk sending, otherwise (in case no
1019
        # graceful approach is requested) the reset operation is performed
1020
        if reset:
×
1021
            graceful = flush and self.is_ready
×
1022
            if graceful: self.send_part(b"")
×
1023
            else: self.send_reset()
×
1024

1025
        # calls the parent close method so that the upper layer
1026
        # instructions are correctly processed/handled
1027
        netius.Stream.close(self)
×
1028

1029
        # verifies if a stream structure exists in the parser for
1030
        # the provided identifier and if that's not the case returns
1031
        # immediately otherwise removes it from the parent
1032
        if not self.owner._has_stream(self.identifier): return
×
1033
        self.owner._del_stream(self.identifier)
×
1034

1035
        # runs the reset operation in the stream clearing all of its
1036
        # internal structures may avoid some memory leaks
1037
        self.reset()
×
1038

1039
    def info_dict(self, full = False):
8✔
1040
        info = netius.Stream.info_dict(self, full = full)
×
1041
        info.update(
×
1042
            identifier = self.identifier,
1043
            dependency = self.dependency,
1044
            weight = self.weight,
1045
            exclusive = self.exclusive,
1046
            end_headers = self.end_headers,
1047
            end_stream = self.end_stream,
1048
            end_stream_l = self.end_stream_l,
1049
            store = self.store,
1050
            file_limit = self.file_limit,
1051
            window = self.window,
1052
            window_m = self.window_m,
1053
            window_o = self.window_o,
1054
            window_l = self.window_l,
1055
            window_t = self.window_t,
1056
            pending_s = self.pending_s,
1057
            headers = self.headers,
1058
            method = self.method,
1059
            path_s = self.path_s,
1060
            version = self.version,
1061
            version_s = self.version_s,
1062
            encodings = self.encodings,
1063
            chunked = self.chunked,
1064
            keep_alive = self.keep_alive,
1065
            content_l = self.content_l,
1066
            frames = self.frames,
1067
            available = self.connection.available_stream(self.identifier, 1),
1068
            exhausted = self.is_exhausted(),
1069
            restored = self.is_restored(),
1070
            _available = self._available
1071
        )
1072
        return info
×
1073

1074
    def available(self):
8✔
1075
        """
1076
        Method called upon the become available event triggered
1077
        when a blocked stream becomes "unblocked" again, this is a
1078
        level operation that is only called once.
1079
        """
1080

1081
        self._available = True
×
1082
        self.owner.trigger("on_available")
×
1083

1084
    def unavailable(self):
8✔
1085
        """
1086
        Called whenever an "unblocked" stream becomes "blocked" again
1087
        this is called only upon the "edge" (once). After this event
1088
        the stream should no longer send frames containing data.
1089
        """
1090

1091
        self._available = False
×
1092
        self.owner.trigger("on_unavailable")
×
1093

1094
    def set_encoding(self, encoding):
8✔
1095
        self.current = encoding
×
1096

1097
    def set_uncompressed(self):
8✔
1098
        if self.current >= http.CHUNKED_ENCODING:
×
1099
            self.current = http.CHUNKED_ENCODING
×
1100
        else: self.current = http.PLAIN_ENCODING
×
1101

1102
    def set_plain(self):
8✔
1103
        self.set_encoding(http.PLAIN_ENCODING)
×
1104

1105
    def set_chunked(self):
8✔
1106
        self.set_encoding(http.CHUNKED_ENCODING)
×
1107

1108
    def set_gzip(self):
8✔
1109
        self.set_encoding(http.GZIP_ENCODING)
×
1110

1111
    def set_deflate(self):
8✔
1112
        self.set_encoding(http.DEFLATE_ENCODING)
×
1113

1114
    def is_plain(self):
8✔
1115
        return self.current == http.PLAIN_ENCODING
×
1116

1117
    def is_chunked(self):
8✔
1118
        return self.current > http.PLAIN_ENCODING
×
1119

1120
    def is_gzip(self):
8✔
1121
        return self.current == http.GZIP_ENCODING
×
1122

1123
    def is_deflate(self):
8✔
1124
        return self.current == http.DEFLATE_ENCODING
×
1125

1126
    def is_compressed(self):
8✔
1127
        return self.current > http.CHUNKED_ENCODING
×
1128

1129
    def is_uncompressed(self):
8✔
1130
        return not self.is_compressed()
×
1131

1132
    def is_flushed(self):
8✔
1133
        return self.current > http.PLAIN_ENCODING
×
1134

1135
    def is_measurable(self, strict = True):
8✔
1136
        if self.is_compressed(): return False
×
1137
        return True
×
1138

1139
    def is_exhausted(self):
8✔
1140
        if self.pending_s > self.connection.max_pending: return True
×
1141
        if not self._available: return True
×
1142
        return False
×
1143

1144
    def is_restored(self):
8✔
1145
        if self.pending_s > self.connection.min_pending: return False
×
1146
        if not self._available: return False
×
1147
        return True
×
1148

1149
    def decode_headers(self, force = False, assert_h = True):
8✔
1150
        if not self.end_headers and not force: return
×
1151
        if self.headers_l and not force: return
×
1152
        if not self.header_b: return
×
1153
        is_joinable = len(self.header_b) > 1
×
1154
        block = b"".join(self.header_b) if is_joinable else self.header_b[0]
×
1155
        self.headers_l = self.owner.decoder.decode(block)
×
1156
        self.header_b = []
×
1157
        if assert_h: self.assert_headers()
×
1158

1159
    def extend_headers(self, fragment):
8✔
1160
        """
1161
        Extends the headers data buffer with the provided
1162
        data fragment. This method may be used for adding
1163
        headers data coming from a continuation frame.
1164

1165
        :type fragment: String
1166
        :param fragment: The data fragment to be used in
1167
        the extension of the headers data.
1168
        """
1169

1170
        self.header_b.append(fragment)
×
1171

1172
    def extend_data(self, data):
8✔
1173
        """
1174
        Adds a data chunk to the buffer associated with the
1175
        stream. Note that the buffer is only populated in case
1176
        the store flag is currently set.
1177

1178
        Even if the store flag is not set this method should be
1179
        called whenever a new data chunk is received in the stream.
1180

1181
        :type data: String
1182
        :param data: The data chunk to be added to the stream's
1183
        internal buffers.
1184
        """
1185

1186
        self._data_l += len(data)
×
1187
        if not self.store: return
×
1188
        self._data_b.write(data)
×
1189

1190
    def remote_update(self, increment):
8✔
1191
        """
1192
        Updates the remote window value, the remote windows is
1193
        the window that controls the output stream of bytes and
1194
        should represent the number of available bytes in the
1195
        remote peer that can be immediately processed.
1196

1197
        :type increment: int
1198
        :param increment: The increment in bytes to be added to
1199
        the current remote window value, this value may be negative.
1200
        """
1201

1202
        self.window += increment
×
1203

1204
    def local_update(self, increment):
8✔
1205
        """
1206
        Increments the current local window value with the increment
1207
        (in bytes) passed as parameter.
1208

1209
        The local window represents the number of bytes that can be
1210
        processed in the current local buffer, effectively representing
1211
        the number of bytes that may still be received in the stream.
1212

1213
        In case the window threshold is reached the method triggers
1214
        the sending of the window update frame.
1215

1216
        :type increment: int
1217
        :param increment: The number of bytes that are going to be
1218
        incremented in the local window value.
1219
        """
1220

1221
        self.window_l += increment
×
1222
        if self.window_l >= self.window_t: return
×
1223
        self.connection.send_window_update(
×
1224
            increment = self.window_o - self.window_l,
1225
            stream = self.identifier
1226
        )
1227
        self.window_l = self.window_o
×
1228

1229
    def get_path(self, normalize = False):
8✔
1230
        """
1231
        Retrieves the path associated with the request, this
1232
        value should be interpreted from the HTTP status line.
1233

1234
        In case the normalize flag is set a possible absolute
1235
        URL value should be normalized into an absolute path.
1236
        This may be required under some proxy related scenarios.
1237

1238
        :type normalize: bool
1239
        :param normalize: If the normalization process should be
1240
        applied for absolute URL scenarios.
1241
        :rtype: String
1242
        :return: The path associated with the current request.
1243
        """
1244

1245
        split = self.path_s.split("?", 1)
×
1246
        path = split[0]
×
1247
        if not normalize: return path
×
1248
        if not path.startswith(("http://", "https://")): return path
×
1249
        return netius.legacy.urlparse(path).path
×
1250

1251
    def get_query(self):
8✔
1252
        """
1253
        Retrieves the (GET) query part of the path, this is considered
1254
        to be the part of the path after the first question mark.
1255

1256
        This query string may be used to parse any possible (GET)
1257
        arguments.
1258

1259
        :rtype: String
1260
        :return: The query part of the path, to be used for parsing
1261
        of (GET) arguments.
1262
        """
1263

1264
        split = self.path_s.split("?", 1)
×
1265
        if len(split) == 1: return ""
×
1266
        else: return split[1]
×
1267

1268
    def get_message_b(self, copy = False, size = 40960):
8✔
1269
        """
1270
        Retrieves a new buffer associated with the currently
1271
        loaded message.
1272

1273
        In case the current parsing operation is using a file like
1274
        object for the handling this object it is returned instead.
1275

1276
        The call of this method is only considered to be safe after
1277
        the complete message has been received and processed, otherwise
1278
        and invalid message file structure may be created.
1279

1280
        Note that the returned object will always be set at the
1281
        beginning of the file, so some care should be taken in usage.
1282

1283
        :type copy: bool
1284
        :param copy: If a copy of the file object should be returned
1285
        or if instead the shallow copy associated with the parser should
1286
        be returned instead, this should be used carefully to avoid any
1287
        memory leak from file descriptors.
1288
        :type size: int
1289
        :param size: Size (in bytes) of the buffer to be used in a possible
1290
        copy operation between buffers.
1291
        :rtype: File
1292
        :return: The file like object that may be used to percolate
1293
        over the various parts of the current message contents.
1294
        """
1295

1296
        # restores the message file to the original/initial position and
1297
        # then in case there's no copy required returns it immediately
1298
        self._data_b.seek(0)
×
1299
        if not copy: return self._data_b
×
1300

1301
        # determines if the file limit for a temporary file has been
1302
        # surpassed and if that's the case creates a named temporary
1303
        # file, otherwise created a memory based buffer
1304
        use_file = self.store and self.content_l >= self.file_limit
×
1305
        if use_file: message_f = tempfile.NamedTemporaryFile(mode = "w+b")
×
1306
        else: message_f = netius.legacy.BytesIO()
×
1307

1308
        try:
×
1309
            # iterates continuously reading the contents from the message
1310
            # file and writing them back to the output (copy) file
1311
            while True:
×
1312
                data = self._data_b.read(size)
×
1313
                if not data: break
×
1314
                message_f.write(data)
×
1315
        finally:
1316
            # resets both of the message file (output and input) to the
1317
            # original position as expected by the infra-structure
1318
            self._data_b.seek(0)
×
1319
            message_f.seek(0)
×
1320

1321
        # returns the final (copy) of the message file to the caller method
1322
        # note that the type of this file may be an in memory or stored value
1323
        return message_f
×
1324

1325
    def get_encodings(self):
8✔
1326
        if not self.encodings == None: return self.encodings
×
1327
        accept_encoding_s = self.headers.get("accept-encoding", "")
×
1328
        self.encodings = [value.strip() for value in accept_encoding_s.split(",")]
×
1329
        return self.encodings
×
1330

1331
    def fragment(self, data):
8✔
1332
        reference = min(
×
1333
            self.connection.window,
1334
            self.window,
1335
            self.window_m
1336
        )
1337
        yield data[:reference]
×
1338
        data = data[reference:]
×
1339
        while data:
×
1340
            yield data[:self.window_m]
×
1341
            data = data[self.window_m:]
×
1342

1343
    def fragmentable(self, data):
8✔
1344
        if not data: return False
×
1345
        if self.window_m == 0: return False
×
1346
        if len(data) <= self.window_m and\
×
1347
            len(data) <= self.window: return False
1348
        return True
×
1349

1350
    def flush(self, *args, **kwargs):
8✔
1351
        if not self.is_open(): return 0
×
1352
        with self.ctx_request(args, kwargs):
×
1353
            return self.connection.flush(*args, **kwargs)
×
1354

1355
    def flush_s(self, *args, **kwargs):
8✔
1356
        if not self.is_open(): return 0
×
1357
        with self.ctx_request(args, kwargs):
×
1358
            return self.connection.flush_s(*args, **kwargs)
×
1359

1360
    def send_response(self, *args, **kwargs):
8✔
1361
        if not self.is_open(): return 0
×
1362
        with self.ctx_request(args, kwargs):
×
1363
            return self.connection.send_response(*args, **kwargs)
×
1364

1365
    def send_header(self, *args, **kwargs):
8✔
1366
        if not self.is_open(): return 0
×
1367
        with self.ctx_request(args, kwargs):
×
1368
            return self.connection.send_header(*args, **kwargs)
×
1369

1370
    def send_part(self, *args, **kwargs):
8✔
1371
        if not self.is_open(): return 0
×
1372
        with self.ctx_request(args, kwargs):
×
1373
            return self.connection.send_part(*args, **kwargs)
×
1374

1375
    def send_reset(self, *args, **kwargs):
8✔
1376
        if not self.is_open(): return 0
×
1377
        with self.ctx_request(args, kwargs):
×
1378
            return self.connection.send_rst_stream(*args, **kwargs)
×
1379

1380
    def assert_headers(self):
8✔
1381
        pseudo = True
×
1382
        pseudos = dict()
×
1383
        for name, value in self.headers_l:
×
1384
            is_pseudo = name.startswith(":")
×
1385
            if not is_pseudo: pseudo = False
×
1386
            if not name.lower() == name:
×
1387
                raise netius.ParserError(
×
1388
                    "Headers must be lower cased",
1389
                    stream = self.identifier,
1390
                    error_code = PROTOCOL_ERROR
1391
                )
1392
            if name in (":status",):
×
1393
                raise netius.ParserError(
×
1394
                    "Response pseudo-header present",
1395
                    stream = self.identifier,
1396
                    error_code = PROTOCOL_ERROR
1397
                )
1398
            if name in ("connection",):
×
1399
                raise netius.ParserError(
×
1400
                    "Invalid header present",
1401
                    stream = self.identifier,
1402
                    error_code = PROTOCOL_ERROR
1403
                )
1404
            if name == "te" and not value == "trailers":
×
1405
                raise netius.ParserError(
×
1406
                    "Invalid value for TE header",
1407
                    stream = self.identifier,
1408
                    error_code = PROTOCOL_ERROR
1409
                )
1410
            if is_pseudo and name in pseudos:
×
1411
                raise netius.ParserError(
×
1412
                    "Duplicated pseudo-header value",
1413
                    stream = self.identifier,
1414
                    error_code = PROTOCOL_ERROR
1415
                )
1416
            if pseudo and not name in HTTP2_PSEUDO:
×
1417
                raise netius.ParserError(
×
1418
                    "Invalid pseudo-header",
1419
                    stream = self.identifier,
1420
                    error_code = PROTOCOL_ERROR
1421
                )
1422
            if not pseudo and is_pseudo:
×
1423
                raise netius.ParserError(
×
1424
                    "Pseudo-header positioned after normal header",
1425
                    stream = self.identifier,
1426
                    error_code = PROTOCOL_ERROR
1427
                )
1428
            if is_pseudo: pseudos[name] = True
×
1429

1430
        for name in (":method", ":scheme", ":path"):
×
1431
            if not name in pseudos:
×
1432
                raise netius.ParserError(
×
1433
                    "Missing pseudo-header in request",
1434
                    stream = self.identifier,
1435
                    error_code = PROTOCOL_ERROR
1436
                )
1437

1438
    def assert_ready(self):
8✔
1439
        if not self.content_l == -1 and not self._data_l == 0 and\
×
1440
            not self._data_l == self.content_l:
1441
            raise netius.ParserError(
×
1442
                "Invalid content-length header value (missmatch)",
1443
                stream = self.identifier,
1444
                error_code = PROTOCOL_ERROR
1445
            )
1446

1447
    @contextlib.contextmanager
8✔
1448
    def ctx_request(self, args = None, kwargs = None):
8✔
1449
        # in case there's no valid set of keyword arguments
1450
        # a valid and empty one must be created (avoids error)
1451
        if kwargs == None: kwargs = dict()
×
1452

1453
        # sets the stream keyword argument with the current
1454
        # stream's identifier (provides identification support)
1455
        kwargs["stream"] = self.identifier
×
1456

1457
        # tries to retrieves a possible callback (method) value
1458
        # and in case it exits uses it to create a new one that
1459
        # calls this one at the end (connection to stream clojure)
1460
        callback = kwargs.get("callback", None)
×
1461
        if callback: kwargs["callback"] = self._build_c(callback)
×
1462

1463
        # retrieves the references to the "original"
1464
        # values of the current and stream objects
1465
        current = self.connection.current
×
1466
        stream_o = self.owner.stream_o
×
1467

1468
        # replaces the values of the current (encoding)
1469
        # and stream object with the stream based ones
1470
        self.connection.current = self.current
×
1471
        self.owner.stream_o = self
×
1472

1473
        try:
×
1474
            # runs the yield operation meaning that
1475
            # the concrete operation will be performed
1476
            # at this point
1477
            yield
×
1478
        finally:
1479
            # restores both the stream object and the current
1480
            # values to the original state (before context)
1481
            self.owner.stream_o = stream_o
×
1482
            self.connection.current = current
×
1483

1484
    @property
8✔
1485
    def parser(self):
1486
        return self
×
1487

1488
    @property
8✔
1489
    def is_ready(self, calculate = True, assert_r = True):
8✔
1490
        """
1491
        Determines if the stream is ready, meaning that the complete
1492
        set of headers and data have been passed to peer and the request
1493
        is ready to be passed to underlying layers for processing.
1494

1495
        :type calculate: bool
1496
        :param calculate: If the calculus of the content length should be
1497
        taken into consideration meaning that the content/data length should
1498
        be ensured to be calculated.
1499
        :type assert_r: bool
1500
        :param assert_r: If the extra assert (ready) operation should be
1501
        performed to ensure that proper data values are defined in the request.
1502
        :rtype: bool
1503
        :return: The final value on the is ready (for processing).
1504
        """
1505

1506
        if not self.is_open(): return False
×
1507
        if calculate: self._calculate()
×
1508
        if not self.end_headers: return False
×
1509
        if not self.end_stream: return False
×
1510
        if assert_r: self.assert_ready()
×
1511
        return True
×
1512

1513
    @property
8✔
1514
    def is_headers(self):
1515
        return self.end_headers
×
1516

1517
    def _calculate(self):
8✔
1518
        if not self._data_b == None: return
×
1519
        if not self._data_l == -1: return
×
1520
        if not self.is_headers: return
×
1521
        self._calculate_headers()
×
1522
        self.content_l = self.headers.get("content-length", -1)
×
1523
        self.content_l = self.content_l and int(self.content_l)
×
1524
        self._data_b = self._build_b()
×
1525
        self._data_l = 0
×
1526

1527
    def _calculate_headers(self):
8✔
1528
        util.verify(self.is_headers)
×
1529
        util.verify(self.headers == None)
×
1530

1531
        headers_m = dict()
×
1532
        headers_s = dict()
×
1533

1534
        for header in self.headers_l:
×
1535
            key, value = header
×
1536
            if not type(key) == str: key = str(key)
×
1537
            if not type(value) == str: value = str(value)
×
1538
            is_special = key.startswith(":")
×
1539
            exists = key in headers_m
×
1540
            if exists:
×
1541
                sequence = headers_m[key]
×
1542
                is_list = type(sequence) == list
×
1543
                if not is_list: sequence = [sequence]
×
1544
                sequence.append(value)
×
1545
                value = sequence
×
1546
            if is_special: headers_s[key] = value
×
1547
            else: headers_m[key] = value
×
1548

1549
        host = headers_s.get(":authority", None)
×
1550
        if host: headers_m["host"] = host
×
1551

1552
        self.headers = headers_m
×
1553
        self.method = headers_s.get(":method", None)
×
1554
        self.path_s = headers_s.get(":path", None)
×
1555
        if self.method: self.method = str(self.method)
×
1556
        if self.path_s: self.path_s = str(self.path_s)
×
1557

1558
    def _build_b(self):
8✔
1559
        """
1560
        Builds the buffer object (compliant with file spec) that is
1561
        going to be used to store the message payload for the HTTP
1562
        request.
1563

1564
        Note that in case the file limit value is exceeded a file system
1565
        based temporary file is used.
1566

1567
        :rtype: File
1568
        :return: A file compliant object to be used to store the
1569
        message payload for the HTTP request.
1570
        """
1571

1572
        use_file = self.store and self.content_l >= self.file_limit
×
1573
        if use_file: return tempfile.NamedTemporaryFile(mode = "w+b")
×
1574
        else: return netius.legacy.BytesIO()
×
1575

1576
    def _build_c(self, callback, validate = True):
8✔
1577
        """
1578
        Builds the final callback function to be used with a clojure
1579
        around the current stream for proper validation and passing
1580
        of the stream as connection parameter (context).
1581

1582
        :type callback: Function
1583
        :param callback: The function to be used as the basis for the
1584
        callback and for which a clojure is going to be applied.
1585
        :type validate: bool
1586
        :param validate: If stream open validation should be applied
1587
        for the calling of the callback, the idea is that is a stream
1588
        is already closed the callback should not be called.
1589
        :rtype: Function
1590
        :return: The final clojure function that may be used safely for
1591
        callback with proper stream context.
1592
        """
1593

1594
        def inner(connection):
×
1595
            if validate and not self.is_open(): return
×
1596
            callback(self)
×
1597

1598
        return inner
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc