• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tableau / TabPy / 4694295226

pending completion
4694295226

Pull #595

github

GitHub
Merge a85607d7c into fad6807d4
Pull Request #595: TabPy Arrow Support

207 of 207 new or added lines in 7 files covered. (100.0%)

1317 of 2311 relevant lines covered (56.99%)

0.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.34
/tabpy/tabpy_server/app/app.py
1
import concurrent.futures
1✔
2
import configparser
1✔
3
import logging
1✔
4
import multiprocessing
1✔
5
import os
1✔
6
import shutil
1✔
7
import signal
1✔
8
import sys
1✔
9
import tabpy
1✔
10
from tabpy.tabpy import __version__
1✔
11
from tabpy.tabpy_server.app.app_parameters import ConfigParameters, SettingsParameters
1✔
12
from tabpy.tabpy_server.app.util import parse_pwd_file
1✔
13
from tabpy.tabpy_server.handlers.basic_auth_server_middleware_factory import BasicAuthServerMiddlewareFactory
1✔
14
from tabpy.tabpy_server.handlers.no_op_auth_handler import NoOpAuthHandler
1✔
15
from tabpy.tabpy_server.management.state import TabPyState
1✔
16
from tabpy.tabpy_server.management.util import _get_state_from_file
1✔
17
from tabpy.tabpy_server.psws.callbacks import init_model_evaluator, init_ps_server
1✔
18
from tabpy.tabpy_server.psws.python_service import PythonService, PythonServiceHandler
1✔
19
from tabpy.tabpy_server.handlers import (
1✔
20
    EndpointHandler,
21
    EndpointsHandler,
22
    EvaluationPlaneHandler,
23
    EvaluationPlaneDisabledHandler,
24
    QueryPlaneHandler,
25
    ServiceInfoHandler,
26
    StatusHandler,
27
    UploadDestinationHandler,
28
)
29
import tornado
1✔
30
import tabpy.tabpy_server.app.arrow_server as pa
1✔
31
import _thread
1✔
32

33
logger = logging.getLogger(__name__)
1✔
34

35
def _init_asyncio_patch():
1✔
36
    """
37
    Select compatible event loop for Tornado 5+.
38
    As of Python 3.8, the default event loop on Windows is `proactor`,
39
    however Tornado requires the old default "selector" event loop.
40
    As Tornado has decided to leave this to users to set, MkDocs needs
41
    to set it. See https://github.com/tornadoweb/tornado/issues/2608.
42
    """
43
    if sys.platform.startswith("win") and sys.version_info >= (3, 8):
1✔
44
        import asyncio
×
45
        try:
×
46
            from asyncio import WindowsSelectorEventLoopPolicy
×
47
        except ImportError:
×
48
            pass  # Can't assign a policy which doesn't exist.
×
49
        else:
50
            if not isinstance(asyncio.get_event_loop_policy(), WindowsSelectorEventLoopPolicy):
×
51
                asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
×
52

53

54
class TabPyApp:
1✔
55
    """
56
    TabPy application class for keeping context like settings, state, etc.
57
    """
58

59
    settings = {}
1✔
60
    subdirectory = ""
1✔
61
    tabpy_state = None
1✔
62
    python_service = None
1✔
63
    credentials = {}
1✔
64
    arrow_server = None
1✔
65

66
    def __init__(self, config_file):
1✔
67
        if config_file is None:
1✔
68
            config_file = os.path.join(
1✔
69
                os.path.dirname(__file__), os.path.pardir, "common", "default.conf"
70
            )
71

72
        if os.path.isfile(config_file):
1✔
73
            try:
1✔
74
                from logging import config
1✔
75
                config.fileConfig(config_file, disable_existing_loggers=False)
1✔
76
            except KeyError:
1✔
77
                logging.basicConfig(level=logging.DEBUG)
1✔
78

79
        self._parse_config(config_file)
1✔
80

81
    def _get_tls_certificates(self, config):
1✔
82
        tls_certificates = []
×
83
        cert = config[SettingsParameters.CertificateFile]
×
84
        key = config[SettingsParameters.KeyFile]
×
85
        with open(cert, "rb") as cert_file:
×
86
            tls_cert_chain = cert_file.read()
×
87
        with open(key, "rb") as key_file:
×
88
            tls_private_key = key_file.read()
×
89
        tls_certificates.append((tls_cert_chain, tls_private_key))
×
90
        return tls_certificates
×
91
    
92
    def _get_arrow_server(self, config):
1✔
93
        verify_client = None
×
94
        tls_certificates = None
×
95
        scheme = "grpc+tcp"
×
96
        if config[SettingsParameters.TransferProtocol] == "https":
×
97
            scheme = "grpc+tls"
×
98
            tls_certificates = self._get_tls_certificates(config)
×
99

100
        host = "localhost"
×
101
        port = config.get(SettingsParameters.ArrowFlightPort)
×
102
        location = "{}://{}:{}".format(scheme, host, port)
×
103

104
        auth_middleware = None
×
105
        if "authentication" in config[SettingsParameters.ApiVersions]["v1"]["features"]:
×
106
            _, creds = parse_pwd_file(config[ConfigParameters.TABPY_PWD_FILE])
×
107
            auth_middleware = {
×
108
                "basic": BasicAuthServerMiddlewareFactory(creds)
109
            }
110

111
        server = pa.FlightServer(host, location,
×
112
                            tls_certificates=tls_certificates,
113
                            verify_client=verify_client, auth_handler=NoOpAuthHandler(),
114
                            middleware=auth_middleware)
115
        return server
×
116

117
    def run(self):
1✔
118
        application = self._create_tornado_web_app()
×
119
        max_request_size = (
×
120
            int(self.settings[SettingsParameters.MaxRequestSizeInMb]) * 1024 * 1024
121
        )
122
        logger.info(f"Setting max request size to {max_request_size} bytes")
×
123

124
        init_model_evaluator(self.settings, self.tabpy_state, self.python_service)
×
125

126
        protocol = self.settings[SettingsParameters.TransferProtocol]
×
127
        ssl_options = None
×
128
        if protocol == "https":
×
129
            ssl_options = {
×
130
                "certfile": self.settings[SettingsParameters.CertificateFile],
131
                "keyfile": self.settings[SettingsParameters.KeyFile],
132
            }
133
        elif protocol != "http":
×
134
            msg = f"Unsupported transfer protocol {protocol}."
×
135
            logger.critical(msg)
×
136
            raise RuntimeError(msg)
×
137

138
        settings = {}
×
139
        if self.settings[SettingsParameters.GzipEnabled] is True:
×
140
            settings["decompress_request"] = True
×
141

142
        application.listen(
×
143
            self.settings[SettingsParameters.Port],
144
            ssl_options=ssl_options,
145
            max_buffer_size=max_request_size,
146
            max_body_size=max_request_size,
147
            **settings,
148
        ) 
149

150
        logger.info(
×
151
            "Web service listening on port "
152
            f"{str(self.settings[SettingsParameters.Port])}"
153
        )
154

155
        if self.settings[SettingsParameters.ArrowEnabled]:
×
156
            def start_pyarrow():
×
157
                self.arrow_server = self._get_arrow_server(self.settings)
×
158
                pa.start(self.arrow_server)
×
159

160
            try:
×
161
                _thread.start_new_thread(start_pyarrow, ())
×
162
            except Exception as e:
×
163
                logger.critical(f"Failed to start PyArrow server: {e}")
×
164

165
        tornado.ioloop.IOLoop.instance().start()
×
166

167
    def _create_tornado_web_app(self):
1✔
168
        class TabPyTornadoApp(tornado.web.Application):
1✔
169
            is_closing = False
1✔
170

171
            def signal_handler(self, signal, _):
1✔
172
                logger.critical(f"Exiting on signal {signal}...")
×
173
                self.is_closing = True
×
174

175
            def try_exit(self):
1✔
176
                if self.is_closing:
×
177
                    tornado.ioloop.IOLoop.instance().stop()
×
178
                    logger.info("Shutting down TabPy...")
×
179

180
        logger.info("Initializing TabPy...")
1✔
181
        tornado.ioloop.IOLoop.instance().run_sync(
1✔
182
            lambda: init_ps_server(self.settings, self.tabpy_state)
183
        )
184
        logger.info("Done initializing TabPy.")
1✔
185

186
        executor = concurrent.futures.ThreadPoolExecutor(
1✔
187
            max_workers=multiprocessing.cpu_count()
188
        )
189

190
        # initialize Tornado application
191
        _init_asyncio_patch()
1✔
192
        application = TabPyTornadoApp(
1✔
193
            [
194
                (
195
                    self.subdirectory + r"/query/([^/]+)",
196
                    QueryPlaneHandler,
197
                    dict(app=self),
198
                ),
199
                (self.subdirectory + r"/status", StatusHandler, dict(app=self)),
200
                (self.subdirectory + r"/info", ServiceInfoHandler, dict(app=self)),
201
                (self.subdirectory + r"/endpoints", EndpointsHandler, dict(app=self)),
202
                (
203
                    self.subdirectory + r"/endpoints/([^/]+)?",
204
                    EndpointHandler,
205
                    dict(app=self),
206
                ),
207
                (
208
                    self.subdirectory + r"/evaluate",
209
                    EvaluationPlaneHandler if self.settings[SettingsParameters.EvaluateEnabled]
210
                    else EvaluationPlaneDisabledHandler,
211
                    dict(executor=executor, app=self),
212
                ),
213
                (
214
                    self.subdirectory + r"/configurations/endpoint_upload_destination",
215
                    UploadDestinationHandler,
216
                    dict(app=self),
217
                ),
218
                (
219
                    self.subdirectory + r"/(.*)",
220
                    tornado.web.StaticFileHandler,
221
                    dict(
222
                        path=self.settings[SettingsParameters.StaticPath],
223
                        default_filename="index.html",
224
                    ),
225
                ),
226
            ],
227
            debug=False,
228
            **self.settings,
229
        )
230

231
        signal.signal(signal.SIGINT, application.signal_handler)
1✔
232
        tornado.ioloop.PeriodicCallback(application.try_exit, 500).start()
1✔
233

234
        signal.signal(signal.SIGINT, application.signal_handler)
1✔
235
        tornado.ioloop.PeriodicCallback(application.try_exit, 500).start()
1✔
236

237
        return application
1✔
238

239
    def _set_parameter(self, parser, settings_key, config_key, default_val, parse_function):
1✔
240
        key_is_set = False
1✔
241

242
        if (
1✔
243
            config_key is not None
244
            and parser.has_section("TabPy")
245
            and parser.has_option("TabPy", config_key)
246
        ):
247
            if parse_function is None:
1✔
248
                parse_function = parser.get
1✔
249
            self.settings[settings_key] = parse_function("TabPy", config_key)
1✔
250
            key_is_set = True
1✔
251
            logger.debug(
1✔
252
                f"Parameter {settings_key} set to "
253
                f'"{self.settings[settings_key]}" '
254
                "from config file or environment variable"
255
            )
256

257
        if not key_is_set and default_val is not None:
1✔
258
            self.settings[settings_key] = default_val
1✔
259
            key_is_set = True
1✔
260
            logger.debug(
1✔
261
                f"Parameter {settings_key} set to "
262
                f'"{self.settings[settings_key]}" '
263
                "from default value"
264
            )
265

266
        if not key_is_set:
1✔
267
            logger.debug(f"Parameter {settings_key} is not set")
1✔
268

269
    def _parse_config(self, config_file):
1✔
270
        """Provide consistent mechanism for pulling in configuration.
271

272
        Attempt to retain backward compatibility for
273
        existing implementations by grabbing port
274
        setting from CLI first.
275

276
        Take settings in the following order:
277

278
        1. CLI arguments if present
279
        2. config file
280
        3. OS environment variables (for ease of
281
           setting defaults if not present)
282
        4. current defaults if a setting is not present in any location
283

284
        Additionally provide similar configuration capabilities in between
285
        config file and environment variables.
286
        For consistency use the same variable name in the config file as
287
        in the os environment.
288
        For naming standards use all capitals and start with 'TABPY_'
289
        """
290
        self.settings = {}
1✔
291
        self.subdirectory = ""
1✔
292
        self.tabpy_state = None
1✔
293
        self.python_service = None
1✔
294
        self.credentials = {}
1✔
295

296
        pkg_path = os.path.dirname(tabpy.__file__)
1✔
297

298
        parser = configparser.ConfigParser(os.environ)
1✔
299
        logger.info(f"Parsing config file {config_file}")
1✔
300

301
        file_exists = False
1✔
302
        if os.path.isfile(config_file):
1✔
303
            try:
1✔
304
                with open(config_file, 'r') as f:
1✔
305
                    parser.read_string(f.read())
1✔
306
                    file_exists = True
1✔
307
            except Exception:
×
308
                pass
×
309

310
        if not file_exists:
1✔
311
            logger.warning(
1✔
312
                f"Unable to open config file {config_file}, "
313
                "using default settings."
314
            )
315

316
        settings_parameters = [
1✔
317
            (SettingsParameters.Port, ConfigParameters.TABPY_PORT, 9004, None),
318
            (SettingsParameters.ServerVersion, None, __version__, None),
319
            (SettingsParameters.EvaluateEnabled, ConfigParameters.TABPY_EVALUATE_ENABLE,
320
             True, parser.getboolean),
321
            (SettingsParameters.EvaluateTimeout, ConfigParameters.TABPY_EVALUATE_TIMEOUT,
322
             30, parser.getfloat),
323
            (SettingsParameters.UploadDir, ConfigParameters.TABPY_QUERY_OBJECT_PATH,
324
             os.path.join(pkg_path, "tmp", "query_objects"), None),
325
            (SettingsParameters.TransferProtocol, ConfigParameters.TABPY_TRANSFER_PROTOCOL,
326
             "http", None),
327
            (SettingsParameters.CertificateFile, ConfigParameters.TABPY_CERTIFICATE_FILE,
328
             None, None),
329
            (SettingsParameters.KeyFile, ConfigParameters.TABPY_KEY_FILE, None, None),
330
            (SettingsParameters.StateFilePath, ConfigParameters.TABPY_STATE_PATH,
331
             os.path.join(pkg_path, "tabpy_server"), None),
332
            (SettingsParameters.StaticPath, ConfigParameters.TABPY_STATIC_PATH,
333
             os.path.join(pkg_path, "tabpy_server", "static"), None),
334
            (ConfigParameters.TABPY_PWD_FILE, ConfigParameters.TABPY_PWD_FILE, None, None),
335
            (SettingsParameters.LogRequestContext, ConfigParameters.TABPY_LOG_DETAILS,
336
             "false", None),
337
            (SettingsParameters.MaxRequestSizeInMb, ConfigParameters.TABPY_MAX_REQUEST_SIZE_MB,
338
             100, None),
339
            (SettingsParameters.GzipEnabled, ConfigParameters.TABPY_GZIP_ENABLE,
340
             True, parser.getboolean),
341
            (SettingsParameters.ArrowEnabled, ConfigParameters.TABPY_ARROW_ENABLE, False, parser.getboolean), 
342
            (SettingsParameters.ArrowFlightPort, ConfigParameters.TABPY_ARROWFLIGHT_PORT, 13622, parser.getint),
343
        ]
344

345
        for setting, parameter, default_val, parse_function in settings_parameters:
1✔
346
            self._set_parameter(parser, setting, parameter, default_val, parse_function)
1✔
347

348
        if not os.path.exists(self.settings[SettingsParameters.UploadDir]):
1✔
349
            os.makedirs(self.settings[SettingsParameters.UploadDir])
1✔
350

351
        # set and validate transfer protocol
352
        self.settings[SettingsParameters.TransferProtocol] = self.settings[
1✔
353
            SettingsParameters.TransferProtocol
354
        ].lower()
355

356
        self._validate_transfer_protocol_settings()
1✔
357

358
        # if state.ini does not exist try and create it - remove
359
        # last dependence on batch/shell script
360
        self.settings[SettingsParameters.StateFilePath] = os.path.realpath(
1✔
361
            os.path.normpath(
362
                os.path.expanduser(self.settings[SettingsParameters.StateFilePath])
363
            )
364
        )
365
        state_config, self.tabpy_state = self._build_tabpy_state()
1✔
366

367
        self.python_service = PythonServiceHandler(PythonService())
1✔
368
        self.settings["compress_response"] = True
1✔
369
        self.settings[SettingsParameters.StaticPath] = os.path.abspath(
1✔
370
            self.settings[SettingsParameters.StaticPath]
371
        )
372
        logger.debug(
1✔
373
            f"Static pages folder set to "
374
            f'"{self.settings[SettingsParameters.StaticPath]}"'
375
        )
376

377
        # Set subdirectory from config if applicable
378
        if state_config.has_option("Service Info", "Subdirectory"):
1✔
379
            self.subdirectory = "/" + state_config.get("Service Info", "Subdirectory")
1✔
380

381
        # If passwords file specified load credentials
382
        if ConfigParameters.TABPY_PWD_FILE in self.settings:
1✔
383
            if not self._parse_pwd_file():
1✔
384
                msg = (
1✔
385
                    "Failed to read passwords file "
386
                    f"{self.settings[ConfigParameters.TABPY_PWD_FILE]}"
387
                )
388
                logger.critical(msg)
1✔
389
                raise RuntimeError(msg)
1✔
390
        else:
391
            logger.info(
1✔
392
                "Password file is not specified: " "Authentication is not enabled"
393
            )
394

395
        features = self._get_features()
1✔
396
        self.settings[SettingsParameters.ApiVersions] = {"v1": {"features": features}}
1✔
397

398
        self.settings[SettingsParameters.LogRequestContext] = (
1✔
399
            self.settings[SettingsParameters.LogRequestContext].lower() != "false"
400
        )
401
        call_context_state = (
1✔
402
            "enabled"
403
            if self.settings[SettingsParameters.LogRequestContext]
404
            else "disabled"
405
        )
406
        logger.info(f"Call context logging is {call_context_state}")
1✔
407

408
    def _validate_transfer_protocol_settings(self):
1✔
409
        if SettingsParameters.TransferProtocol not in self.settings:
1✔
410
            msg = "Missing transfer protocol information."
×
411
            logger.critical(msg)
×
412
            raise RuntimeError(msg)
×
413

414
        protocol = self.settings[SettingsParameters.TransferProtocol]
1✔
415

416
        if protocol == "http":
1✔
417
            return
1✔
418

419
        if protocol != "https":
1✔
420
            msg = f"Unsupported transfer protocol: {protocol}"
1✔
421
            logger.critical(msg)
1✔
422
            raise RuntimeError(msg)
1✔
423

424
        self._validate_cert_key_state(
1✔
425
            "The parameter(s) {} must be set.",
426
            SettingsParameters.CertificateFile in self.settings,
427
            SettingsParameters.KeyFile in self.settings,
428
        )
429
        cert = self.settings[SettingsParameters.CertificateFile]
1✔
430

431
        self._validate_cert_key_state(
1✔
432
            "The parameter(s) {} must point to " "an existing file.",
433
            os.path.isfile(cert),
434
            os.path.isfile(self.settings[SettingsParameters.KeyFile]),
435
        )
436
        tabpy.tabpy_server.app.util.validate_cert(cert)
1✔
437

438
    @staticmethod
1✔
439
    def _validate_cert_key_state(msg, cert_valid, key_valid):
440
        cert_and_key_param = (
1✔
441
            f"{ConfigParameters.TABPY_CERTIFICATE_FILE} and "
442
            f"{ConfigParameters.TABPY_KEY_FILE}"
443
        )
444
        https_error = "Error using HTTPS: "
1✔
445
        err = None
1✔
446
        if not cert_valid and not key_valid:
1✔
447
            err = https_error + msg.format(cert_and_key_param)
1✔
448
        elif not cert_valid:
1✔
449
            err = https_error + msg.format(ConfigParameters.TABPY_CERTIFICATE_FILE)
1✔
450
        elif not key_valid:
1✔
451
            err = https_error + msg.format(ConfigParameters.TABPY_KEY_FILE)
1✔
452

453
        if err is not None:
1✔
454
            logger.critical(err)
1✔
455
            raise RuntimeError(err)
1✔
456

457
    def _parse_pwd_file(self):
1✔
458
        succeeded, self.credentials = parse_pwd_file(
1✔
459
            self.settings[ConfigParameters.TABPY_PWD_FILE]
460
        )
461

462
        if succeeded and len(self.credentials) == 0:
1✔
463
            logger.error("No credentials found")
1✔
464
            succeeded = False
1✔
465

466
        return succeeded
1✔
467

468
    def _get_features(self):
1✔
469
        features = {}
1✔
470

471
        # Check for auth
472
        if ConfigParameters.TABPY_PWD_FILE in self.settings:
1✔
473
            features["authentication"] = {
1✔
474
                "required": True,
475
                "methods": {"basic-auth": {}},
476
            }
477

478
        features["evaluate_enabled"] = self.settings[SettingsParameters.EvaluateEnabled]
1✔
479
        features["gzip_enabled"] = self.settings[SettingsParameters.GzipEnabled]
1✔
480
        features["arrow_enabled"] = self.settings[SettingsParameters.ArrowEnabled]
1✔
481
        return features
1✔
482

483
    def _build_tabpy_state(self):
1✔
484
        pkg_path = os.path.dirname(tabpy.__file__)
1✔
485
        state_file_dir = self.settings[SettingsParameters.StateFilePath]
1✔
486
        state_file_path = os.path.join(state_file_dir, "state.ini")
1✔
487
        if not os.path.isfile(state_file_path):
1✔
488
            state_file_template_path = os.path.join(
×
489
                pkg_path, "tabpy_server", "state.ini.template"
490
            )
491
            logger.debug(
×
492
                f"File {state_file_path} not found, creating from "
493
                f"template {state_file_template_path}..."
494
            )
495
            shutil.copy(state_file_template_path, state_file_path)
×
496

497
        logger.info(f"Loading state from state file {state_file_path}")
1✔
498
        tabpy_state = _get_state_from_file(state_file_dir)
1✔
499
        return tabpy_state, TabPyState(config=tabpy_state, settings=self.settings)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc