• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tcalmant / ipopo / 17025960620

17 Aug 2025 09:16PM UTC coverage: 84.662% (+1.3%) from 83.323%
17025960620

Pull #176

github

web-flow
Merge 850c581e1 into 80da9234f
Pull Request #176: HTTP asynchronous server

653 of 761 new or added lines in 4 files covered. (85.81%)

182 existing lines in 4 files now uncovered.

14970 of 17682 relevant lines covered (84.66%)

3.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.49
/pelix/http/basic_async.py
1
#!/usr/bin/env python
2
# -- Content-Encoding: UTF-8 --
3
"""
1✔
4
Pelix basic asynchronous HTTP service bundle.
5

6
Provides an implementation of the Pelix HTTP service based on aiohttp.
7

8
:author: Thomas Calmant
9
:copyright: Copyright 2025, Thomas Calmant
10
:license: Apache License 2.0
11
:version: 3.1.0
12

13
..
14

15
    Copyright 2025 Thomas Calmant
16

17
    Licensed under the Apache License, Version 2.0 (the "License");
18
    you may not use this file except in compliance with the License.
19
    You may obtain a copy of the License at
20

21
        https://www.apache.org/licenses/LICENSE-2.0
22

23
    Unless required by applicable law or agreed to in writing, software
24
    distributed under the License is distributed on an "AS IS" BASIS,
25
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26
    See the License for the specific language governing permissions and
27
    limitations under the License.
28
"""
29

30
import asyncio
4✔
31
import concurrent.futures
4✔
32
import io
4✔
33
import logging
4✔
34
import re
4✔
35
import socket
4✔
36
import ssl
4✔
37
import sys
4✔
38
import threading
4✔
39
import traceback
4✔
40
from typing import IO, TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast
4✔
41

42
import aiohttp.client_exceptions
4✔
43
import aiohttp.web
4✔
44

45
import pelix.constants as fw_constants
4✔
46
import pelix.http as http
4✔
47
import pelix.ipopo.constants as constants
4✔
48
import pelix.remote
4✔
49
import pelix.utilities as utilities
4✔
50
from pelix.internals.registry import ServiceReference
4✔
51
from pelix.ipopo.decorators import (
4✔
52
    BindField,
53
    ComponentFactory,
54
    HiddenProperty,
55
    Invalidate,
56
    Property,
57
    Provides,
58
    Requires,
59
    UnbindField,
60
    UpdateField,
61
    Validate,
62
)
63

64
if TYPE_CHECKING:
1✔
65
    from pelix.framework import BundleContext
66

67

68
# ------------------------------------------------------------------------------
69

70
# Module version
71
__version_info__ = (3, 1, 0)
4✔
72
__version__ = ".".join(str(x) for x in __version_info__)
4✔
73

74
# Documentation strings format
75
__docformat__ = "restructuredtext en"
4✔
76

77
# ------------------------------------------------------------------------------
78

79
HTTP_SERVICE_EXTRA = "http.extra"
4✔
80
""" HTTP service extra properties (dictionary) """
4✔
81

82
DEFAULT_BIND_ADDRESS = "0.0.0.0"
4✔
83
""" By default, bind to all IPv4 interfaces """
4✔
84

85
LOCALHOST_ADDRESS = "127.0.0.1"
4✔
86
"""
4✔
87
Local address, if None is given as binding address, instead of the default one
88
"""
89

90

91
class _SyncHTTPServletRequest(http.AbstractHTTPServletRequest):
4✔
92
    """
93
    HTTP Servlet request helper
94
    """
95

96
    def __init__(self, request: aiohttp.web.Request, full_path: str, prefix: str, content: bytes) -> None:
4✔
97
        """
98
        Sets up the request helper
99

100
        :param request: The aiohttp Request object
101
        :param full_path: The full request path, including the prefix
102
        :param prefix: The path to the servlet root
103
        :parma content: The request content
104
        """
105
        self._request = request
4✔
106
        self._prefix = prefix
4✔
107
        self._content = content
4✔
108

109
        # Compute the sub path
110
        self._sub_path = full_path[len(prefix) :]
4✔
111
        if not self._sub_path.startswith("/"):
4✔
112
            self._sub_path = f"/{self._sub_path}"
4✔
113

114
        self._sub_path = re.sub("/+", "/", self._sub_path)
4✔
115

116
    def get_command(self) -> str:
4✔
117
        """
118
        Returns the HTTP verb (GET, POST, ...) used for the request
119
        """
120
        return self._request.method.upper()
4✔
121

122
    def get_client_address(self) -> Tuple[str, int]:
4✔
123
        """
124
        Retrieves the address of the client
125

126
        :return: A (host, port) tuple
127
        """
128
        if self._request.transport is None:
4✔
129
            # No transport, no address
NEW
130
            raise IOError("No transport available for the request")
×
131

132
        return self._request.transport.get_extra_info("peername")[:2]
4✔
133

134
    def get_header(self, name: str, default: Optional[Any] = None) -> Any:
4✔
135
        """
136
        Retrieves the value of a header
137
        """
138
        return self._request.headers.get(name, default)
4✔
139

140
    def get_headers(self) -> Dict[str, Any]:
4✔
141
        """
142
        Retrieves all headers
143
        """
144
        return cast(Dict[str, Any], self._request.headers)
4✔
145

146
    def get_path(self) -> str:
4✔
147
        """
148
        Retrieves the request full path
149
        """
NEW
150
        return self._request.path
×
151

152
    def get_prefix_path(self) -> str:
4✔
153
        """
154
        Returns the path to the servlet root
155

156
        :return: A request path (string)
157
        """
158
        return self._prefix
4✔
159

160
    def get_sub_path(self) -> str:
4✔
161
        """
162
        Returns the servlet-relative path, i.e. after the prefix
163

164
        :return: A request path (string)
165
        """
166
        return self._sub_path
4✔
167

168
    def get_rfile(self) -> IO[bytes]:
4✔
169
        """
170
        Retrieves the input as a file stream
171
        """
NEW
172
        return io.BytesIO(self._content)
×
173

174

175
class _WriteWrapper(IO[bytes]):
4✔
176
    def __init__(self):
4✔
177
        self._buffer = io.BytesIO()
4✔
178
        self._closed = False
4✔
179

180
    def get(self) -> bytes:
4✔
181
        """
182
        Retrieves the written data as bytes.
183
        This method should be called after the response has been sent.
184

185
        :return: The written data
186
        """
187
        return self._buffer.getvalue()
4✔
188

189
    def read(self, size: int = -1) -> bytes:
4✔
NEW
190
        raise IOError("This stream is not readable")
×
191

192
    def write(self, b: bytes) -> int:
4✔
193
        return self._buffer.write(b)
4✔
194

195
    def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
4✔
NEW
196
        raise IOError("This stream is not seekable")
×
197

198
    def tell(self) -> int:
4✔
NEW
199
        raise IOError("This stream is not seekable")
×
200

201
    def close(self) -> None:
4✔
202
        self._closed = True
4✔
203

204
    def flush(self) -> None:
4✔
NEW
205
        pass
×
206

207
    def readable(self) -> bool:
4✔
NEW
208
        return False
×
209

210
    def writable(self) -> bool:
4✔
NEW
211
        return True
×
212

213
    def seekable(self) -> bool:
4✔
NEW
214
        return False
×
215

216
    def closed(self) -> bool:
4✔
NEW
217
        return self._closed
×
218

219

220
class _SyncHTTPServletResponse(http.AbstractHTTPServletResponse):
4✔
221
    """
222
    HTTP Servlet response helper
223
    """
224

225
    def __init__(self, request: aiohttp.web.Request, loop: asyncio.AbstractEventLoop) -> None:
4✔
226
        """
227
        Sets up the response helper
228

229
        :param request: The aiohttp Request object
230
        :param loop: The asyncio event loop
231
        """
232
        self._request = request
4✔
233
        self._loop = loop
4✔
234
        self._headers: Dict[str, str] = {}
4✔
235
        self._headers_set: bool = False
4✔
236
        self._code: int = 200
4✔
237
        self._message: str | None = None
4✔
238
        self._writer = _WriteWrapper()
4✔
239

240
    def to_aiohttp_response(self) -> aiohttp.web.StreamResponse:
4✔
241
        """
242
        Converts the response to an aiohttp StreamResponse object.
243
        This method should be called after all headers have been set.
244

245
        :return: The aiohttp StreamResponse object
246
        """
247
        return aiohttp.web.Response(
4✔
248
            body=self._writer.get(), status=self._code, reason=self._message, headers=self._headers
249
        )
250

251
    def set_response(self, code: int, message: Optional[str] = None) -> None:
4✔
252
        """
253
        Sets the response line.
254
        This method should be the first called when sending an answer.
255

256
        :param code: HTTP result code
257
        :param message: Associated message
258
        """
259
        if self._headers_set:
4✔
NEW
260
            raise IOError("Headers have already been set, cannot change the response code")
×
261

262
        self._code = code
4✔
263
        self._message = message
4✔
264

265
    def set_header(self, name: str, value: Any) -> None:
4✔
266
        """
267
        Sets the value of a header.
268
        This method should not be called after ``end_headers()``.
269

270
        :param name: Header name
271
        :param value: Header value
272
        """
273
        if self._headers_set:
4✔
NEW
274
            raise IOError("Headers have already been set, cannot change them")
×
275

276
        if value is None:
4✔
NEW
277
            self._headers.pop(name.lower(), None)
×
278
        else:
279
            self._headers[name.lower()] = str(value)
4✔
280

281
    def is_header_set(self, name: str) -> bool:
4✔
282
        """
283
        Checks if the given header has already been set
284

285
        :param name: Header name
286
        :return: True if it has already been set
287
        """
288
        return name.lower() in self._headers
4✔
289

290
    def end_headers(self) -> None:
4✔
291
        """
292
        Ends the headers part
293
        """
294
        self._headers_set = True
4✔
295

296
    def get_wfile(self) -> IO[bytes]:
4✔
297
        """
298
        Retrieves the output as a file stream.
299
        ``end_headers()`` should have been called before, except if you want
300
        to write your own headers.
301

302
        :return: The output file-like object
303
        """
304
        if not self._headers_set:
4✔
NEW
305
            self.end_headers()
×
306

307
        return self._writer
4✔
308

309
    def write(self, data: bytes) -> None:
4✔
310
        """
311
        Writes the given data.
312
        ``end_headers()`` should have been called before, except if you want
313
        to write your own headers.
314

315
        :param data: Data to be written
316
        """
317
        writer = self.get_wfile()
4✔
318
        writer.write(data)
4✔
319
        writer.close()
4✔
320

321

322
# ------------------------------------------------------------------------------
323

324

325
class _AsyncHTTPServletRequest(http.AbstractAsyncHTTPServletRequest):
4✔
326
    """
327
    HTTP Servlet request helper
328
    """
329

330
    def __init__(self, request: aiohttp.web.Request, full_path: str, prefix: str) -> None:
4✔
331
        """
332
        Sets up the request helper
333

334
        :param request: The aiohttp Request object
335
        :param full_path: The full request path, including the prefix
336
        :param prefix: The path to the servlet root
337
        """
338
        self._request = request
4✔
339
        self._prefix = prefix
4✔
340

341
        # Compute the sub path
342
        self._sub_path = full_path[len(prefix) :]
4✔
343
        if not self._sub_path.startswith("/"):
4✔
344
            self._sub_path = f"/{self._sub_path}"
4✔
345

346
        self._sub_path = re.sub("/+", "/", self._sub_path)
4✔
347

348
    def get_command(self) -> str:
4✔
349
        """
350
        Returns the HTTP verb (GET, POST, ...) used for the request
351
        """
NEW
352
        return self._request.method.upper()
×
353

354
    def get_client_address(self) -> Tuple[str, int]:
4✔
355
        """
356
        Retrieves the address of the client
357

358
        :return: A (host, port) tuple
359
        """
360
        if self._request.transport is None:
4✔
361
            # No transport, no address
NEW
362
            raise IOError("No transport available for the request")
×
363

364
        return self._request.transport.get_extra_info("peername")[:2]
4✔
365

366
    async def get_header(self, name: str, default: Optional[Any] = None) -> Any:
4✔
367
        """
368
        Retrieves the value of a header
369
        """
370
        return self._request.headers.get(name, default)
4✔
371

372
    async def get_headers(self) -> Dict[str, Any]:
4✔
373
        """
374
        Retrieves all headers
375
        """
376
        return cast(Dict[str, Any], self._request.headers)
4✔
377

378
    def get_path(self) -> str:
4✔
379
        """
380
        Retrieves the request full path
381
        """
NEW
382
        return self._request.path
×
383

384
    def get_prefix_path(self) -> str:
4✔
385
        """
386
        Returns the path to the servlet root
387

388
        :return: A request path (string)
389
        """
NEW
390
        return self._prefix
×
391

392
    def get_sub_path(self) -> str:
4✔
393
        """
394
        Returns the servlet-relative path, i.e. after the prefix
395

396
        :return: A request path (string)
397
        """
NEW
398
        return self._sub_path
×
399

400
    def get_rfile(self) -> asyncio.StreamReader:
4✔
401
        """
402
        Retrieves the input as a file stream
403
        """
404
        # aiohttp StreamReader is compatible with the asyncio one
NEW
405
        return cast(asyncio.StreamReader, self._request.content)
×
406

407

408
class _AioHttpWriter(http.AbstractAsyncWriter):
4✔
409
    """
410
    Wrapper for aiohttp StreamResponse
411
    """
412

413
    def __init__(self, response: aiohttp.web.StreamResponse) -> None:
4✔
NEW
414
        self._response = response
×
415

416
    async def write(self, raw: bytes) -> int:
4✔
NEW
417
        await self._response.write(raw)
×
NEW
418
        return len(raw)
×
419

420
    async def flush(self) -> None:
4✔
NEW
421
        await self._response.drain()
×
422

423

424
class _AsyncHTTPServletResponse(http.AbstractAsyncHTTPServletResponse):
4✔
425
    """
426
    HTTP Servlet response helper
427
    """
428

429
    def __init__(self, request: aiohttp.web.Request) -> None:
4✔
430
        """
431
        Sets up the response helper
432

433
        :param request: The aiohttp Request object
434
        """
435
        self._request = request
4✔
436
        self._headers_set: bool = False
4✔
437
        self._sse_set: bool = False
4✔
438
        self._response = aiohttp.web.StreamResponse()
4✔
439

440
    def to_aiohttp_response(self) -> aiohttp.web.StreamResponse:
4✔
441
        """
442
        Converts the response to an aiohttp StreamResponse object.
443
        This method should be called after all headers have been set.
444

445
        :return: The aiohttp StreamResponse object
446
        """
447
        return self._response
4✔
448

449
    def set_response(self, code: int, message: Optional[str] = None) -> None:
4✔
450
        """
451
        Sets the response line.
452
        This method should be the first called when sending an answer.
453

454
        :param code: HTTP result code
455
        :param message: Associated message
456
        """
457
        if self._headers_set:
4✔
NEW
458
            raise IOError("Headers have already been set, cannot change the response code")
×
459

460
        self._response.set_status(code, message)
4✔
461

462
    def set_header(self, name: str, value: Any) -> None:
4✔
463
        """
464
        Sets the value of a header.
465
        This method should not be called after ``end_headers()``.
466

467
        :param name: Header name
468
        :param value: Header value
469
        """
470
        if self._headers_set:
4✔
NEW
471
            raise IOError("Headers have already been set, cannot change them")
×
472

473
        if value is None:
4✔
NEW
474
            self._response.headers.popall(name.lower(), None)
×
475
        else:
476
            self._response.headers.add(name.lower(), str(value))
4✔
477

478
    def is_header_set(self, name: str) -> bool:
4✔
479
        """
480
        Checks if the given header has already been set
481

482
        :param name: Header name
483
        :return: True if it has already been set
484
        """
485
        return self._response.headers.get(name.lower(), None) is not None
4✔
486

487
    def setup_sse(self, strict: bool = True) -> None:
4✔
488
        """
489
        Sets up the response for Server-Sent Events (SSE)
490

491
        :param strict: If True, raises an error if the request is not for SSE
492
        """
493
        if self._sse_set:
4✔
494
            # Already set up for SSE
NEW
495
            return
×
496

497
        if strict and not any(
4✔
498
            "text/event-stream" in accepted for accepted in self._request.headers.getall("accept", "")
499
        ):
NEW
500
            raise ValueError("Cannot set up SSE for a non-SSE request")
×
501

502
        if self._headers_set:
4✔
NEW
503
            raise IOError("Headers have already been set, cannot change them")
×
504

505
        self._response.headers["Content-Type"] = "text/event-stream"
4✔
506
        self._response.headers["Cache-Control"] = "no-cache"
4✔
507
        self._response.headers["Connection"] = "keep-alive"
4✔
508
        self._sse_set = True
4✔
509

510
    async def end_headers(self) -> None:
4✔
511
        """
512
        Ends the headers part
513
        """
514
        self._headers_set = True
4✔
515
        await self._response.prepare(self._request)
4✔
516

517
    def get_wfile(self) -> http.AbstractAsyncWriter:
4✔
518
        """
519
        Retrieves the output as a writer.
520
        ``end_headers()`` should have been called before.
521

522
        :return: A writer for the output stream
523
        """
NEW
524
        return _AioHttpWriter(self._response)
×
525

526
    async def write(self, data: bytes) -> None:
4✔
527
        """
528
        Writes the given data.
529
        ``end_headers()`` should have been called before, except if you want
530
        to write your own headers.
531

532
        :param data: Data to be written
533
        """
534
        await self._response.write(data)
4✔
535

536
    async def send_sse(self, data: str, event: str | None = None, id: str | None = None) -> None:
4✔
537
        """
538
        Sends a Server-Sent Event (SSE) message.
539

540
        :param data: The event data (without newline characters)
541
        :param event: Optional event type (e.g., "message", "update")
542
        :param id: Optional event ID (set to "" to reset the ID)
543
        """
544
        if not self._sse_set:
4✔
NEW
545
            raise IOError("SSE not set up, call setup_sse() first")
×
546

547
        # Prepare the SSE message
548
        parts: list[str] = []
4✔
549
        if id:
4✔
NEW
550
            parts.append(f"id: {id}")
×
551
        elif id is not None:
4✔
552
            # Reset ID
NEW
553
            parts.append("id")
×
554

555
        if event:
4✔
NEW
556
            parts.append(f"event: {event}")
×
557
        elif event is not None:
4✔
NEW
558
            parts.append(f"event")
×
559

560
        if data:
4✔
561
            parts.append(f"data: {data}")
4✔
562
        else:
563
            # Empty data line
NEW
564
            parts.append("data")
×
565

566
        # End of the event
567
        parts.append("")
4✔
568
        parts.append("")
4✔
569

570
        try:
4✔
571
            await self.write("\n".join(parts).encode("utf-8"))
4✔
572
        except aiohttp.client_exceptions.ClientConnectionResetError:
4✔
573
            raise IOError("Client connection reset during SSE send") from None
4✔
574

575

576
class WSSession(http.WebSocketSession):
4✔
577
    def __init__(
4✔
578
        self,
579
        ws_handler: http.WebSocketHandler,
580
        servlet_request: _AsyncHTTPServletRequest,
581
        ws_response: aiohttp.web.WebSocketResponse,
582
    ) -> None:
583
        """
584
        Initializes the WebSocket session
585

586
        :param ws_handler: The WebSocket handler
587
        :param servlet_request: The servlet request
588
        :param ws_response: The WebSocket response
589
        """
590
        self._handler = ws_handler
4✔
591
        self._request = servlet_request
4✔
592
        self._response = ws_response
4✔
593

594
    def get_client_address(self) -> Tuple[str, int]:
4✔
595
        """
596
        Returns the address of the client
597

598
        :return: A (host, port) tuple
599
        """
NEW
600
        return self._request.get_client_address()
×
601

602
    async def send_binary(self, message: bytes) -> None:
4✔
603
        """
604
        Sends a binary message to the client
605

606
        :param message: Binary message to send
607
        """
NEW
608
        if self._response.closed:
×
NEW
609
            raise IOError("WebSocket session is closed")
×
610

NEW
611
        await self._response.send_bytes(message)
×
612

613
    async def send_text(self, message: str) -> None:
4✔
614
        """
615
        Sends a message to the client
616

617
        :param message: Message to send
618
        """
619
        if self._response.closed:
4✔
NEW
620
            raise IOError("WebSocket session is closed")
×
621

622
        await self._response.send_str(message)
4✔
623

624
    async def close(self, code: int = 1000, reason: str | None = None) -> None:
4✔
625
        """
626
        Closes the WebSocket session
627

628
        :param code: Close code (default is 1000, normal closure)
629
        :param reason: Optional reason for the closure
630
        """
631
        if not self._response.closed:
4✔
632
            await self._response.close(code=code, message=(reason or "").encode("utf-8"))
4✔
633

634

635
# ------------------------------------------------------------------------------
636

637

638
@ComponentFactory(http.FACTORY_HTTP_ASYNC)
4✔
639
@Provides(http.HTTP_SERVICE)
4✔
640
@Requires("_servlets_services", http.Servlet, True, True)
4✔
641
@Requires("_servlets_async_services", http.AsyncServlet, True, True)
4✔
642
@Requires("_websocket_handler_services", http.WebSocketHandler, True, True)
4✔
643
@Requires("_error_handler", http.ErrorHandler, optional=True)
4✔
644
@Property("_address", http.HTTP_SERVICE_ADDRESS, DEFAULT_BIND_ADDRESS)
4✔
645
@Property("_port", http.HTTP_SERVICE_PORT, 8080)
4✔
646
@Property("_uses_ssl", http.HTTP_USES_SSL, False)
4✔
647
@Property("_cert_file", http.HTTPS_CERT_FILE, None)
4✔
648
@Property("_key_file", http.HTTPS_KEY_FILE, None)
4✔
649
@HiddenProperty("_key_password", http.HTTPS_KEY_PASSWORD, None)
4✔
650
@Property("_extra", HTTP_SERVICE_EXTRA, None)
4✔
651
@Property("_instance_name", constants.IPOPO_INSTANCE_NAME)
4✔
652
@Property("_logger_name", "pelix.http.logger.name", "")
4✔
653
@Property("_logger_level", "pelix.http.logger.level", None)
4✔
654
class AsyncHttpServiceImpl(http.HTTPService):
4✔
655
    """
656
    Asynchronous HTTP service component
657
    """
658

659
    def __init__(self) -> None:
4✔
660
        # Properties
661
        self._address = "0.0.0.0"
4✔
662
        self._port = 8080
4✔
663
        self._uses_ssl = False
4✔
664
        self._extra: Optional[Dict[str, Any]] = None
4✔
665
        self._instance_name: Optional[str] = None
4✔
666
        self._logger_name: Optional[str] = None
4✔
667
        self._logger_level: str | int | None = None
4✔
668

669
        # SSL Parameters
670
        self._cert_file: Optional[str] = None
4✔
671
        self._key_file: Optional[str] = None
4✔
672
        self._key_password: Optional[str] = None
4✔
673

674
        # Validation flag
675
        self._validated = False
4✔
676

677
        # The logger
678
        self._logger: logging.Logger = logging.getLogger(f"{__name__}#init")
4✔
679

680
        # Servlets registry lock
681
        self._lock = threading.RLock()
4✔
682

683
        # Path -> (servlet, parameters, type)
684
        self._servlets: Dict[
4✔
685
            str, Tuple[http.Servlet | http.AsyncServlet, Dict[str, Any], http.ServletType]
686
        ] = {}
687

688
        # Fields injected by iPOPO
689
        self._servlets_services: List[http.Servlet] = []
4✔
690
        self._servlets_async_services: List[http.AsyncServlet] = []
4✔
691
        self._websocket_handler_services: List[http.WebSocketHandler] = []
4✔
692
        self._error_handler: Optional[http.ErrorHandler] = None
4✔
693

694
        # Servlet -> ServiceReference
695
        self._servlets_refs: Dict[
4✔
696
            http.Servlet | http.AsyncServlet | http.WebSocketHandler,
697
            ServiceReference[http.Servlet | http.AsyncServlet | http.WebSocketHandler],
698
        ] = {}
699
        self._binding_lock = threading.RLock()
4✔
700

701
        # Server control
702
        self._bound_address: tuple[str, int] | None = None
4✔
703
        self._app: aiohttp.web.Application | None = None
4✔
704
        self._thread: threading.Thread | None = None
4✔
705
        self._executor: concurrent.futures.ThreadPoolExecutor | None = None
4✔
706
        self._loop: asyncio.AbstractEventLoop | None = None
4✔
707
        self._start_done_event: utilities.EventData[tuple[str, int]] = utilities.EventData()
4✔
708
        self._stop_event: asyncio.Event = asyncio.Event()
4✔
709
        self._stop_done_event: threading.Event = threading.Event()
4✔
710

711
    def __str__(self) -> str:
4✔
712
        """
713
        String representation of the instance
714
        """
NEW
715
        return f"BasicHttpService({self._address}, {self._port})"
×
716

717
    @Validate
4✔
718
    def validate(self, context: "BundleContext") -> None:
4✔
719
        """
720
        Component validated
721
        """
722
        # Check if we'll use an SSL connection
723
        self._uses_ssl = self._cert_file is not None
4✔
724

725
        if not self._address:
4✔
726
            # No address given, use the localhost address
727
            self._address = LOCALHOST_ADDRESS
4✔
728

729
        if self._port is None:
4✔
730
            # Random port
731
            self._port = 0
4✔
732
        else:
733
            # Ensure we have an integer
734
            self._port = int(self._port)
4✔
735
            if self._port < 0:
4✔
736
                # Random port
NEW
737
                self._port = 0
×
738

739
        # Normalize the extra properties
740
        if not isinstance(self._extra, dict):
4✔
741
            self._extra = {}
4✔
742

743
        # Set up the logger
744
        if not self._logger_name:
4✔
745
            # Empty name, use the instance name
746
            self._logger_name = self._instance_name
4✔
747

748
        self._logger = logging.getLogger(self._logger_name)
4✔
749

750
        level: int | None = None
4✔
751
        if self._logger_level is None:
4✔
752
            self._logger.level = logging.INFO
4✔
753
        elif isinstance(self._logger_level, int):
4✔
NEW
754
            level = self._logger_level
×
755
        else:
756
            level = utilities.get_log_level(self._logger_level)
4✔
757
            if level is None:
4✔
NEW
758
                try:
×
NEW
759
                    level = int(self._logger_level)
×
NEW
760
                except ValueError:
×
761
                    # Invalid level
NEW
762
                    level = None
×
763

764
        self._logger.level = level if level is not None else logging.INFO
4✔
765

766
        self.log(
4✔
767
            logging.INFO,
768
            "Starting HTTP%s server: [%s]:%d ...",
769
            "S" if self._uses_ssl else "",
770
            self._address,
771
            self._port,
772
        )
773

774
        # Create the server
775
        app = aiohttp.web.Application(logger=self._logger)
4✔
776
        app.add_routes([aiohttp.web.route("*", "/{tail:.*}", self.__global_handler)])
4✔
777
        self._app = app
4✔
778

779
        # Start the server in a separate thread
780
        self._stop_event.clear()
4✔
781
        self._stop_done_event.clear()
4✔
782
        self._start_done_event.clear()
4✔
783
        self._thread = threading.Thread(target=self._run_server_thread, name="Pelix Async HTTP Server Thread")
4✔
784
        self._thread.daemon = True
4✔
785
        self._thread.start()
4✔
786

787
        # Wait for the server to be ready
788
        if not self._start_done_event.wait(10):
4✔
NEW
789
            self._logger.error("HTTP server did not start in time")
×
NEW
790
            raise IOError("HTTP server did not start in time")
×
791

792
        if self._start_done_event.data is None:
4✔
NEW
793
            self._logger.error("HTTP server did not bind to an address")
×
NEW
794
            raise IOError("HTTP server did not bind to an address")
×
795

796
        host, port = self._start_done_event.data
4✔
797
        self._bound_address = (host, port)
4✔
798
        self._port = port
4✔
799

800
        with self._binding_lock:
4✔
801
            # Set the validation flag up, once the server is ready
802
            self._validated = True
4✔
803

804
            # Register bound servlets
805
            for service, svc_ref in self._servlets_refs.items():
4✔
NEW
806
                self.__register_servlet_service(service, svc_ref)
×
807

808
        self._logger.info(
4✔
809
            "HTTP%s server bound to: [%s]:%d ...",
810
            "S" if self._uses_ssl else "",
811
            self._address,
812
            self._port,
813
        )
814

815
    @Invalidate
4✔
816
    def invalidate(self, context: "BundleContext") -> None:
4✔
817
        """
818
        Component invalidated
819
        """
820
        # Clear the validation flag
821
        self._validated = False
4✔
822

823
        self.log(
4✔
824
            logging.INFO,
825
            "Stopping HTTP%s server: [%s]:%d ...",
826
            "S" if self._uses_ssl else "",
827
            self._address,
828
            self._port,
829
        )
830

831
        # Set the stop event
832
        if self._loop is not None:
4✔
833
            # Call for a stop
834
            async def async_stop_caller():
4✔
835
                self._stop_event.set()
4✔
836

837
            asyncio.run_coroutine_threadsafe(async_stop_caller(), self._loop).result()
4✔
838

839
        # Wait for the stop done event to be set
840
        self._logger.debug("Waiting for the stop done event to be set...")
4✔
841
        if not self._stop_done_event.wait(timeout=5):
4✔
NEW
842
            self.log(
×
843
                logging.WARNING,
844
                "The stop done event was not set in time, the server may not have stopped properly",
845
            )
846

847
        # Wait for the server thread to stop
848
        if self._thread is not None and self._thread.is_alive():
4✔
NEW
849
            self._thread.join(timeout=0.5)
×
NEW
850
            if self._thread.is_alive():
×
NEW
851
                self.log(logging.WARNING, "HTTP server thread did not stop in time")
×
852

853
        self._thread = None
4✔
854

855
        # Close the event loop
856
        if self._loop is not None and not self._loop.is_closed():
4✔
857
            self._loop.close()
4✔
858

859
        # Clear references
860
        self._loop = None
4✔
861
        self._app = None
4✔
862

863
    def _run_server_thread(self) -> None:
4✔
864
        """
865
        Runs the HTTP server in a separate thread.
866
        """
867
        try:
4✔
868
            # Set the event loop for this thread
869
            if sys.platform.startswith("win"):
4✔
870
                # aiodns requires a specific event loop on Windows
NEW
871
                asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
×
872

873
            self._loop = asyncio.new_event_loop()
4✔
874
            asyncio.set_event_loop(self._loop)
4✔
875

876
            # Setup the server
877
            self._loop.run_until_complete(self._run_server())
4✔
NEW
878
        except Exception:
×
NEW
879
            self._logger.exception("Error running async HTTP server")
×
880
        finally:
881
            if self._loop is not None:
4✔
882
                self._loop.stop()
4✔
883
            self._stop_done_event.set()
4✔
884

885
    async def _run_server(self) -> None:
4✔
886
        try:
4✔
887
            assert self._app is not None, "Application must be initialized before running the server"
4✔
888

889
            # Create the server
890
            runner = aiohttp.web.AppRunner(self._app)
4✔
891
            await runner.setup()
4✔
892

893
            # Prepare SSL context if needed
894
            ssl_context: ssl.SSLContext | None = None
4✔
895
            if self._uses_ssl:
4✔
896
                assert self._cert_file is not None, "Certificate file must be set for HTTPS"
4✔
897

898
                # Create the SSL context
899
                ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
4✔
900
                ssl_context.load_cert_chain(
4✔
901
                    certfile=self._cert_file, keyfile=self._key_file, password=self._key_password
902
                )
903

904
            # Create the site
905
            site = aiohttp.web.TCPSite(runner, self._address, self._port, ssl_context=ssl_context)
4✔
906

907
            with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
4✔
908
                self._executor = executor
4✔
909

910
                # Start the site
911
                await site.start()
4✔
912

913
                # Get bound address and port
914
                sock = cast(asyncio.Server, site._server).sockets[0]
4✔
915
                host, port = sock.getsockname()[:2]
4✔
916

917
                # We're ready
918
                self._start_done_event.set((host, port))
4✔
919

920
                # Keep the thread alive until the server is stopped
921
                await self._stop_event.wait()
4✔
922

923
                # Clean up the server
924
                await site.stop()
4✔
925
                await runner.shutdown()
4✔
926
                await runner.cleanup()
4✔
927
                await self._app.shutdown()
4✔
928
                await self._app.cleanup()
4✔
NEW
929
        except Exception as ex:
×
930
            # Anything went wrong, log the error
NEW
931
            self._logger.error("Error running the HTTP server: %s", ex)
×
NEW
932
            self._start_done_event.raise_exception(ex)
×
933

934
    async def __global_handler(self, request: aiohttp.web.Request) -> aiohttp.web.StreamResponse:
4✔
935
        """
936
        Global handler for the Pelix async HTTP service.
937

938
        :param request: The incoming request
939
        :return: The response to send
940
        """
941
        assert self._loop is not None, "EventLoop must be initialized before handling requests"
4✔
942

943
        if self._executor is None:
4✔
944
            # No executor available, cannot handle the request
NEW
945
            return aiohttp.web.Response(status=503, text="Service unavailable")
×
946

947
        # Remove the double-slashes in the request path
948
        path = re.sub("/+", "/", request.path)
4✔
949

950
        # Get the corresponding servlet
951
        found_servlet = self.get_servlet(path)
4✔
952
        if found_servlet is not None:
4✔
953
            servlet, _, prefix, servlet_type = found_servlet
4✔
954

955
            async_name = f"do_async_{request.method.upper()}"
4✔
956
            sync_name = f"do_{request.method.upper()}"
4✔
957

958
            try:
4✔
959
                match servlet_type:
4✔
960
                    case http.ServletType.ASYNC if hasattr(servlet, async_name):
4✔
961
                        # Prepare the helpers
962
                        servlet_request = _AsyncHTTPServletRequest(request, path, prefix)
4✔
963
                        servlet_response = _AsyncHTTPServletResponse(request)
4✔
964

965
                        # Handle the request
966
                        handler_method = getattr(servlet, async_name)
4✔
967
                        await handler_method(servlet_request, servlet_response)
4✔
968
                        return servlet_response.to_aiohttp_response()
4✔
969

970
                    case http.ServletType.SYNC if hasattr(servlet, sync_name):
4✔
971
                        # Read the request content
972
                        # FIXME: find a better way to handle the content, wrapping the request
973
                        #        in a file-like object
974
                        content = await request.read()
4✔
975

976
                        # Prepare the helpers
977
                        servlet_request = _SyncHTTPServletRequest(request, path, prefix, content)
4✔
978
                        servlet_response = _SyncHTTPServletResponse(request, self._loop)
4✔
979

980
                        # Handle the request in the executor
981
                        handler_method = getattr(servlet, sync_name)
4✔
982
                        await self._loop.run_in_executor(
4✔
983
                            self._executor, handler_method, servlet_request, servlet_response
984
                        )
985
                        return servlet_response.to_aiohttp_response()
4✔
986

987
                    case http.ServletType.WEBSOCKET if isinstance(servlet, http.WebSocketHandler):
4✔
988
                        # Prepare the WebSocket handler
989
                        ws_handler = cast(http.WebSocketHandler, servlet)
4✔
990
                        servlet_request = _AsyncHTTPServletRequest(request, path, prefix)
4✔
991

992
                        # Prepare the WebSocket response
993
                        ws_response = aiohttp.web.WebSocketResponse()
4✔
994

995
                        # Prepare a session
996
                        ws_session = WSSession(ws_handler, servlet_request, ws_response)
4✔
997

998
                        # Early check
999
                        if not await ws_handler.ws_accept(servlet_request):
4✔
1000
                            # The handler does not accept the WebSocket connection
NEW
1001
                            return aiohttp.web.Response(status=400, text="WebSocket connection refused")
×
1002

1003
                        # Prepare the WebSocket response
1004
                        await ws_response.prepare(request)
4✔
1005

1006
                        # Notify the WebSocket handler of the new connection
1007
                        await ws_handler.ws_open(ws_session, servlet_request)
4✔
1008

1009
                        async for msg in ws_response:
4✔
1010
                            # Handle incoming messages
1011
                            match msg.type:
4✔
1012
                                case aiohttp.WSMsgType.ERROR:
4✔
1013
                                    # Error message received
NEW
1014
                                    self._logger.error("WebSocket error: %s", ws_response.exception())
×
NEW
1015
                                    await ws_handler.ws_error(ws_session, msg.data)
×
1016

1017
                                case aiohttp.WSMsgType.PING:
4✔
1018
                                    # Ping message received
NEW
1019
                                    await ws_response.pong(msg.data)
×
1020

1021
                                case aiohttp.WSMsgType.BINARY:
4✔
1022
                                    # Binary message received
NEW
1023
                                    await ws_handler.ws_binary(ws_session, msg.data)
×
1024

1025
                                case aiohttp.WSMsgType.TEXT:
4✔
1026
                                    # Text message received
1027
                                    await ws_handler.ws_message(ws_session, msg.data)
4✔
1028
                        else:
1029
                            code = ws_response.close_code or aiohttp.WSCloseCode.GOING_AWAY
4✔
1030
                            await ws_handler.ws_close(ws_session, code, "Session closed")
4✔
1031

1032
                        await ws_response.close()
4✔
1033
                        return ws_response
4✔
1034
            except:
4✔
1035
                # Send a 500 error page on error
1036
                self._logger.exception("Error handling %s request to %s", request.method, path)
4✔
1037
                return self.send_exception(path)
4✔
1038

1039
        # Return the super implementation if needed
1040
        return aiohttp.web.Response(status=404, body=self.make_not_found_page(path), content_type="text/html")
4✔
1041

1042
    def send_exception(self, path: str) -> aiohttp.web.Response:
4✔
1043
        """
1044
        Sends an exception page with a 500 error code.
1045
        Must be called from inside the exception handling block.
1046

1047
        :param response: The response handler
1048
        """
1049
        # Get a formatted stack trace
1050
        stack = traceback.format_exc()
4✔
1051

1052
        # Log the error
1053
        self._logger.error("Error handling request upon: %s\n%s\n", path, stack)
4✔
1054

1055
        # Send the page
1056
        return aiohttp.web.Response(status=500, text=self.make_exception_page(path, stack))
4✔
1057

1058
    def __safe_callback(self, instance: http.Servlet, method: str, *args: Any, **kwargs: Any) -> Any:
4✔
1059
        """
1060
        Safely calls the given method in the given instance.
1061
        Returns True on method absence.
1062
        Returns False on error.
1063
        Returns the method result if found.
1064

1065
        :param instance: The instance to call
1066
        :param method: The method to call in the instance
1067
        :return: The method result or True on method absence or False on error
1068
        """
1069
        # Call back the method
1070
        if instance is None:
4✔
1071
            # Consider invalidity as a failure
NEW
1072
            return False
×
1073

1074
        try:
4✔
1075
            callback = getattr(instance, method)
4✔
1076
        except AttributeError:
4✔
1077
            # Consider absence as a success
1078
            return True
4✔
1079

1080
        try:
4✔
1081
            result = callback(*args, **kwargs)
4✔
1082
            if result is None:
4✔
1083
                # Special case: consider None as success
1084
                return True
4✔
1085

1086
            return result
4✔
1087

1088
        except Exception as ex:
4✔
1089
            self.log_exception("Error calling back an instance: %s", ex)
4✔
1090

1091
        return False
4✔
1092

1093
    def __register_servlet_service(
4✔
1094
        self,
1095
        service: http.Servlet | http.AsyncServlet | http.WebSocketHandler,
1096
        service_reference: ServiceReference[http.Servlet | http.AsyncServlet | http.WebSocketHandler],
1097
    ) -> None:
1098
        """
1099
        Registers a servlet according to its service properties
1100

1101
        :param service: A servlet service
1102
        :param service_reference: The associated ServiceReference
1103
        """
1104
        spec = cast(list[str], service_reference.get_property(fw_constants.OBJECTCLASS))
4✔
1105
        if http.HTTP_SERVLET in spec:
4✔
1106
            # Servlet bound
1107
            sync_servlet = cast(http.Servlet, service)
4✔
1108
            paths = service_reference.get_property(http.HTTP_SERVLET_PATH)
4✔
1109
            if utilities.is_string(paths):
4✔
1110
                # Register the servlet to a single path
1111
                self.register_servlet(paths, sync_servlet, {}, http.ServletType.SYNC)
4✔
1112
            elif isinstance(paths, (list, tuple)):
4✔
1113
                # Register the servlet to multiple paths
1114
                for path in paths:
4✔
1115
                    self.register_servlet(path, sync_servlet, {}, http.ServletType.SYNC)
4✔
1116

1117
        # No else here: a service could implement both specifications
1118
        if http.HTTP_SERVLET_ASYNC in spec:
4✔
1119
            # Asynchronous servlet bound
1120
            async_servlet = cast(http.AsyncServlet, service)
4✔
1121
            paths = service_reference.get_property(http.HTTP_SERVLET_ASYNC_PATH)
4✔
1122
            if utilities.is_string(paths):
4✔
1123
                # Register the servlet to a single path
1124
                self.register_servlet(paths, async_servlet, {}, http.ServletType.ASYNC)
4✔
1125
            elif isinstance(paths, (list, tuple)):
4✔
1126
                # Register the servlet to multiple paths
1127
                for path in paths:
4✔
1128
                    self.register_servlet(path, async_servlet, {}, http.ServletType.ASYNC)
4✔
1129

1130
        # No else here: a service could implement both specifications
1131
        if http.HTTP_WEBSOCKET_HANDLER in spec:
4✔
1132
            # WebSocket handler bound
NEW
1133
            websocket_handler = cast(http.WebSocketHandler, service)
×
NEW
1134
            paths = service_reference.get_property(http.HTTP_WEBSOCKET_PATH)
×
NEW
1135
            if utilities.is_string(paths):
×
1136
                # Register the WebSocket handler to a single path
NEW
1137
                self.register_servlet(paths, websocket_handler, {}, http.ServletType.WEBSOCKET)
×
NEW
1138
            elif isinstance(paths, (list, tuple)):
×
1139
                # Register the WebSocket handler to multiple paths
NEW
1140
                for path in paths:
×
NEW
1141
                    self.register_servlet(path, websocket_handler, {}, http.ServletType.WEBSOCKET)
×
1142

1143
    @BindField("_servlets_services")
4✔
1144
    @BindField("_servlets_async_services")
4✔
1145
    @BindField("_websocket_handler_services")
4✔
1146
    def _bind_servlet(
4✔
1147
        self,
1148
        _: str,
1149
        service: http.Servlet | http.AsyncServlet | http.WebSocketHandler,
1150
        service_reference: ServiceReference[http.Servlet | http.AsyncServlet | http.WebSocketHandler],
1151
    ) -> None:
1152
        """
1153
        Called by iPOPO when a service is bound
1154
        """
1155
        # Ignore imported services
1156
        if self.__is_imported(service_reference):
4✔
NEW
1157
            self._logger.debug("Ignoring imported service as it is imported: %s", service_reference)
×
NEW
1158
            return
×
1159

1160
        with self._binding_lock:
4✔
1161
            self._servlets_refs[service] = service_reference
4✔
1162

1163
            if self._validated:
4✔
1164
                # We've been validated, register the service
1165
                self.__register_servlet_service(service, service_reference)
4✔
1166

1167
    @UpdateField("_servlets_services")
4✔
1168
    @UpdateField("_servlets_async_services")
4✔
1169
    @UpdateField("_websocket_handler_services")
4✔
1170
    def _update_servlet(
4✔
1171
        self,
1172
        _: str,
1173
        service: http.Servlet | http.AsyncServlet | http.WebSocketHandler,
1174
        service_reference: ServiceReference[http.Servlet | http.AsyncServlet | http.WebSocketHandler],
1175
        old_properties: Dict[str, Any],
1176
    ) -> None:
1177
        """
1178
        Called by iPOPO when the properties of a service have been updated
1179
        """
1180
        # Ignore imported services
1181
        if self.__is_imported(service_reference):
4✔
NEW
1182
            return
×
1183
        # Check if the property concerns the registration
1184
        old_path = old_properties.get(http.HTTP_SERVLET_PATH)
4✔
1185
        new_path = service_reference.get_property(http.HTTP_SERVLET_PATH)
4✔
1186

1187
        old_async_path = old_properties.get(http.HTTP_SERVLET_ASYNC_PATH)
4✔
1188
        new_async_path = service_reference.get_property(http.HTTP_SERVLET_ASYNC_PATH)
4✔
1189

1190
        old_ws_path = old_properties.get(http.HTTP_WEBSOCKET_PATH)
4✔
1191
        new_ws_path = service_reference.get_property(http.HTTP_WEBSOCKET_PATH)
4✔
1192

1193
        if old_path == new_path and old_async_path == new_async_path and old_ws_path == new_ws_path:
4✔
1194
            # Nothing to do
NEW
1195
            return
×
1196

1197
        with self._binding_lock:
4✔
1198
            # Unregister the previous paths
1199
            self.unregister(None, service)
4✔
1200

1201
            if self._validated:
4✔
1202
                # Register the service with its new properties
1203
                self.__register_servlet_service(service, service_reference)
4✔
1204

1205
    @UnbindField("_servlets_services")
4✔
1206
    @UnbindField("_servlets_async_services")
4✔
1207
    @UnbindField("_websocket_handler_services")
4✔
1208
    def _unbind_servlet(
4✔
1209
        self,
1210
        _: str,
1211
        service: http.Servlet | http.AsyncServlet | http.WebSocketHandler,
1212
        service_reference: ServiceReference[http.Servlet | http.AsyncServlet | http.WebSocketHandler],
1213
    ) -> None:
1214
        """
1215
        Called by iPOPO when a service is gone
1216
        """
1217
        # Ignore imported services
1218
        if self.__is_imported(service_reference):
4✔
NEW
1219
            return
×
1220

1221
        with self._binding_lock:
4✔
1222
            # Servlet gone: unregister all paths associated to this servlet
1223
            self.unregister(None, service)
4✔
1224

1225
            # Remove the service reference
1226
            try:
4✔
1227
                del self._servlets_refs[service]
4✔
NEW
1228
            except KeyError:
×
1229
                # Service reference not found, nothing to do
NEW
1230
                pass
×
1231

1232
    def get_access(self) -> Tuple[str, int]:
4✔
1233
        """
1234
        Retrieves the (address, port) tuple to access the server
1235
        """
1236
        assert self._bound_address is not None, "Server must be started before accessing its address"
4✔
1237
        return self._bound_address
4✔
1238

1239
    @staticmethod
4✔
1240
    def get_hostname() -> str:
4✔
1241
        """
1242
        Retrieves the server host name
1243

1244
        :return: The server host name
1245
        """
1246
        return socket.gethostname()
4✔
1247

1248
    def is_https(self) -> bool:
4✔
1249
        """
1250
        Returns True if this is an HTTPS server
1251

1252
        :return: True if this server uses SSL
1253
        """
NEW
1254
        return self._uses_ssl
×
1255

1256
    def get_registered_paths(self) -> List[str]:
4✔
1257
        """
1258
        Returns the paths registered by servlets
1259

1260
        :return: The paths registered by servlets (sorted list)
1261
        """
1262
        return sorted(self._servlets)
4✔
1263

1264
    def get_servlet(
4✔
1265
        self, path: Optional[str]
1266
    ) -> Optional[Tuple[http.Servlet | http.AsyncServlet, Dict[str, Any], str, http.ServletType]]:
1267
        """
1268
        Retrieves the servlet matching the given path and its parameters.
1269
        Returns None if no servlet matches the given path.
1270

1271
        :param path: A request URI
1272
        :return: A tuple (servlet, parameters, prefix, type) or None
1273
        """
1274
        if not path or path[0] != "/":
4✔
1275
            # No path, nothing to return
1276
            return None
4✔
1277

1278
        # Use lower case for comparison
1279
        path = path.lower()
4✔
1280

1281
        if path[-1] != "/":
4✔
1282
            # Add a trailing slash
1283
            path += "/"
4✔
1284

1285
        with self._lock:
4✔
1286
            longest_match = ""
4✔
1287
            longest_match_len = 0
4✔
1288
            for servlet_path in self._servlets:
4✔
1289
                tested_path = servlet_path
4✔
1290
                if tested_path[-1] != "/":
4✔
1291
                    # Add a trailing slash
1292
                    tested_path += "/"
4✔
1293

1294
                if path.startswith(tested_path) and len(servlet_path) > longest_match_len:
4✔
1295
                    # Found a corresponding servlet
1296
                    # which is deeper than the previous one
1297
                    longest_match = servlet_path
4✔
1298
                    longest_match_len = len(servlet_path)
4✔
1299

1300
            # Return the found servlet
1301
            if not longest_match:
4✔
1302
                # No match found
1303
                return None
4✔
1304

1305
            # Retrieve the stored information
1306
            servlet, params, servlet_type = self._servlets[longest_match]
4✔
1307
            return servlet, params, longest_match, servlet_type
4✔
1308

1309
    def make_not_found_page(self, path: str) -> str:
4✔
1310
        """
1311
        Prepares a "page not found" page for a 404 error
1312

1313
        :param path: Request path
1314
        :return: A HTML page
1315
        """
1316
        page = None
4✔
1317
        if self._error_handler is not None:
4✔
NEW
1318
            page = self._error_handler.make_not_found_page(path)
×
1319

1320
        if not page:
4✔
1321
            page = f"""<html>
4✔
1322
<head>
1323
<title>404 - Page not found</title>
1324
</head>
1325
<body>
1326
<h1>Page not found</h1>
1327
<p>No servlet is associated to this path:</p>
1328
<pre>{path}</pre>
1329
<h2>Registered paths:</h2>
1330
{http.make_html_list(self.get_registered_paths())}
1331
</body>
1332
</html>"""
1333
        return page
4✔
1334

1335
    def make_exception_page(self, path: str, stack: str) -> str:
4✔
1336
        """
1337
        Prepares a page printing an exception stack trace in a 500 error
1338

1339
        :param path: Request path
1340
        :param stack: Exception stack trace
1341
        :return: A HTML page
1342
        """
1343
        page = None
4✔
1344
        if self._error_handler is not None:
4✔
NEW
1345
            page = self._error_handler.make_exception_page(path, stack)
×
1346

1347
        if not page:
4✔
1348
            page = f"""<html>
4✔
1349
<head>
1350
<title>500 - Internal Server Error</title>
1351
</head>
1352
<body>
1353
<h1>Internal Server Error</h1>
1354
<p>Error handling request upon: {path}</p>
1355
<pre>
1356
{stack}
1357
</pre>
1358
</body>
1359
</html>"""
1360
        return page
4✔
1361

1362
    def register_servlet(
4✔
1363
        self,
1364
        path: str,
1365
        servlet: http.Servlet | http.AsyncServlet,
1366
        parameters: Optional[Dict[str, Any]] = None,
1367
        servlet_type: http.ServletType = http.ServletType.SYNC,
1368
    ) -> bool:
1369
        """
1370
        Registers a servlet
1371

1372
        :param path: Path handled by this servlet
1373
        :param servlet: The servlet instance
1374
        :param parameters: The parameters associated to this path
1375
        :param servlet_type: The type of servlet (sync, async, websocket, ...)
1376
        :return: True if the servlet has been registered, False if it refused the binding.
1377
        :raise ValueError: Invalid path or handler
1378
        """
1379
        if servlet is None:
4✔
1380
            raise ValueError("Invalid servlet instance")
4✔
1381

1382
        if not path or path[0] != "/":
4✔
1383
            raise ValueError("Invalid path given to register the servlet: {0}".format(path))
4✔
1384

1385
        # Use lower-case paths
1386
        path = path.lower()
4✔
1387

1388
        # Prepare the parameters
1389
        if parameters is None:
4✔
1390
            parameters = {}
4✔
1391

1392
        with self._lock:
4✔
1393
            if path in self._servlets:
4✔
1394
                # Already registered path
1395
                if self._servlets[path][0] is servlet:
4✔
1396
                    # Double-registration: Nothing to do
1397
                    return True
4✔
1398
                else:
1399
                    # Path is already taken by another servlet
1400
                    already_taken = True
4✔
1401
            else:
1402
                # Path is available
1403
                already_taken = False
4✔
1404

1405
            # Add server information in parameters
1406
            parameters[http.PARAM_ADDRESS] = self._address
4✔
1407
            parameters[http.PARAM_PORT] = self._port
4✔
1408
            parameters[http.PARAM_HTTPS] = self._uses_ssl
4✔
1409
            parameters[http.PARAM_NAME] = self._instance_name
4✔
1410
            parameters[http.PARAM_EXTRA] = self._extra.copy() if self._extra else None
4✔
1411
            parameters[http.PARAM_ASYNC] = servlet_type != http.ServletType.SYNC
4✔
1412

1413
            # The servlet might refuse to be bound to this server
1414
            if not self.__safe_callback(servlet, "accept_binding", path, parameters):
4✔
1415
                # Server refused: stop right there
1416
                # => No need to raise the "already taken path" exception
1417
                return False
4✔
1418

1419
            if already_taken:
4✔
1420
                # The path is already taken by another servlet
1421
                raise ValueError("A servlet is already registered on {0}".format(path))
4✔
1422

1423
            # Tell the servlet it can be bound to the path
1424
            if self.__safe_callback(servlet, "bound_to", path, parameters):
4✔
1425
                # Store the servlet
1426
                self._servlets[path] = (servlet, parameters, servlet_type)
4✔
1427
                return True
4✔
1428

1429
            # The servlet refused the binding
1430
            return False
4✔
1431

1432
    def unregister(self, path: Optional[str], servlet: Optional[http.Servlet] = None) -> bool:
4✔
1433
        """
1434
        Unregisters the servlet for the given path
1435

1436
        :param path: The path to a servlet
1437
        :param servlet: If given, unregisters all the paths handled by this servlet
1438
        :return: True if at least one path as been unregistered, else False
1439
        """
1440
        if servlet is not None:
4✔
1441
            with self._lock:
4✔
1442
                # Unregister all paths for this servlet
1443
                paths = [
4✔
1444
                    servlet_path
1445
                    for (servlet_path, servlet_info) in self._servlets.items()
1446
                    if servlet_info[0] == servlet
1447
                ]
1448

1449
            result = False
4✔
1450
            for servlet_path in paths:
4✔
1451
                result |= self.unregister(servlet_path)
4✔
1452

1453
            return result
4✔
1454
        else:
1455
            if not path:
4✔
1456
                # Invalid path
1457
                return False
4✔
1458

1459
            # Always use lower case to compare paths
1460
            path = path.lower()
4✔
1461

1462
            with self._lock:
4✔
1463
                # Notify the servlet
1464
                servlet_info = self._servlets.get(path)
4✔
1465
                if servlet_info is None:
4✔
1466
                    # Unknown path
1467
                    return False
4✔
1468

1469
                self.__safe_callback(servlet_info[0], "unbound_from", path, servlet_info[1])
4✔
1470

1471
                # Remove the servlet
1472
                try:
4✔
1473
                    del self._servlets[path]
4✔
NEW
1474
                except KeyError:
×
NEW
1475
                    self.log(logging.DEBUG, "Tried to remove an unknown servlet path: %s", path)
×
1476
                return True
4✔
1477

1478
    def log(self, level: int, message: str, *args: Any, **kwargs: Any) -> None:
4✔
1479
        """
1480
        Logs the given message
1481

1482
        :param level: Log entry level
1483
        :param message: Log message (Python logging format)
1484
        """
1485
        if self._logger is not None:
4✔
1486
            # Log the message
1487
            self._logger.log(level, message, *args, **kwargs)
4✔
1488

1489
    def log_exception(self, message: str, *args: Any, **kwargs: Any) -> None:
4✔
1490
        """
1491
        Logs an exception
1492

1493
        :param message: Log message (Python logging format)
1494
        """
1495
        if self._logger is not None:
4✔
1496
            # Log the exception
1497
            self._logger.exception(message, *args, **kwargs)
4✔
1498

1499
    @staticmethod
4✔
1500
    def __is_imported(service_reference: ServiceReference[Any]) -> bool:
4✔
1501
        """
1502
        Tests if the given service has been imported by Remote Services
1503

1504
        :param service_reference: The reference of the service to check
1505
        :return: True if the service is flagged as imported
1506
        """
1507
        return cast(bool, service_reference.get_property(pelix.remote.PROP_IMPORTED))
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc