• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tableau / TabPy / 4694295226

pending completion
4694295226

Pull #595

github

GitHub
Merge a85607d7c into fad6807d4
Pull Request #595: TabPy Arrow Support

207 of 207 new or added lines in 7 files covered. (100.0%)

1317 of 2311 relevant lines covered (56.99%)

0.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.43
/tabpy/tabpy_server/handlers/evaluation_plane_handler.py
1
import pandas
1✔
2
import pyarrow
1✔
3
import uuid
1✔
4

5
from tabpy.tabpy_server.handlers import BaseHandler
1✔
6
import json
1✔
7
import simplejson
1✔
8
import logging
1✔
9
from tabpy.tabpy_server.common.util import format_exception
1✔
10
import requests
1✔
11
from tornado import gen
1✔
12
from datetime import timedelta
1✔
13
from tabpy.tabpy_server.handlers.util import AuthErrorStates
1✔
14

15
class RestrictedTabPy:
1✔
16
    def __init__(self, protocol, port, logger, timeout, headers):
1✔
17
        self.protocol = protocol
1✔
18
        self.port = port
1✔
19
        self.logger = logger
1✔
20
        self.timeout = timeout
1✔
21
        self.headers = headers
1✔
22

23
    def query(self, name, *args, **kwargs):
1✔
24
        url = f"{self.protocol}://localhost:{self.port}/query/{name}"
×
25
        self.logger.log(logging.DEBUG, f"Querying {url}...")
×
26
        internal_data = {"data": args or kwargs}
×
27
        data = json.dumps(internal_data)
×
28
        headers = self.headers
×
29
        response = requests.post(
×
30
            url=url, data=data, headers=headers, timeout=self.timeout, verify=False
31
        )
32
        return response.json()
×
33

34

35
class EvaluationPlaneDisabledHandler(BaseHandler):
1✔
36
    """
37
    EvaluationPlaneDisabledHandler responds with error message when ad-hoc scripts have been disabled.
38
    """
39

40
    def initialize(self, executor, app):
1✔
41
        super(EvaluationPlaneDisabledHandler, self).initialize(app)
1✔
42
        self.executor = executor
1✔
43

44
    @gen.coroutine
1✔
45
    def post(self):
46
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
1✔
47
            self.fail_with_auth_error()
1✔
48
            return
1✔
49
        self.error_out(404, "Ad-hoc scripts have been disabled on this analytics extension, please contact your "
1✔
50
                            "administrator.")
51

52

53
class EvaluationPlaneHandler(BaseHandler):
1✔
54
    """
55
    EvaluationPlaneHandler is responsible for running arbitrary python scripts.
56
    """
57

58
    def initialize(self, executor, app):
1✔
59
        super(EvaluationPlaneHandler, self).initialize(app)
1✔
60
        self.arrow_server = app.arrow_server
1✔
61
        self.executor = executor
1✔
62
        self._error_message_timeout = (
1✔
63
            f"User defined script timed out. "
64
            f"Timeout is set to {self.eval_timeout} s."
65
        )
66

67
    @gen.coroutine
1✔
68
    def _post_impl(self):
69
        body = json.loads(self.request.body.decode("utf-8"))
1✔
70
        self.logger.log(logging.DEBUG, f"Processing POST request...")
1✔
71
        if "script" not in body:
1✔
72
            self.error_out(400, "Script is empty.")
1✔
73
            return
1✔
74

75
        # Transforming user script into a proper function.
76
        user_code = body["script"]
1✔
77
        arguments = None
1✔
78
        arguments_str = ""
1✔
79
        if self.arrow_server is not None and "dataPath" in body:
1✔
80
            # arrow flight scenario
81
            arrow_data = self.get_arrow_data(body["dataPath"])
×
82
            if arrow_data is not None:
×
83
                arguments = {"_arg1": arrow_data}
×
84
        elif "data" in body:
1✔
85
            # legacy scenario
86
            arguments = body["data"]
1✔
87

88
        if arguments is not None:
1✔
89
            if not isinstance(arguments, dict):
1✔
90
                self.error_out(
×
91
                    400, "Script parameters need to be provided as a dictionary."
92
                )
93
                return
×
94
            args_in = sorted(arguments.keys())
1✔
95
            n = len(arguments)
1✔
96
            if sorted('_arg'+str(i+1) for i in range(n)) == args_in:
1✔
97
                arguments_str = ", " + ", ".join(args_in)
1✔
98
            else:
99
                self.error_out(
1✔
100
                    400,
101
                    "Variables names should follow "
102
                    "the format _arg1, _arg2, _argN",
103
                )
104
                return
1✔
105
        function_to_evaluate = f"def _user_script(tabpy{arguments_str}):\n"
1✔
106
        for u in user_code.splitlines():
1✔
107
            function_to_evaluate += " " + u + "\n"
1✔
108

109
        self.logger.log(
1✔
110
            logging.INFO, f"function to evaluate={function_to_evaluate}"
111
        )
112

113
        try:
1✔
114
            result = yield self._call_subprocess(function_to_evaluate, arguments)
1✔
115
        except (
1✔
116
            gen.TimeoutError,
117
            requests.exceptions.ConnectTimeout,
118
            requests.exceptions.ReadTimeout,
119
        ):
120
            self.logger.log(logging.ERROR, self._error_message_timeout)
×
121
            self.error_out(408, self._error_message_timeout)
×
122
            return
×
123

124
        if result is not None:
1✔
125
            if self.arrow_server is not None and "dataPath" in body:
1✔
126
                # arrow flight scenario
127
                output_data_id = str(uuid.uuid4())
×
128
                self.upload_arrow_data(result, output_data_id, {
×
129
                    'removeOnDelete': 'True',
130
                    'linkedIDs': body["dataPath"]
131
                })
132
                result = { 'outputDataPath': output_data_id }
×
133
                self.logger.log(logging.WARN, f'outputDataPath={output_data_id}')
×
134
            else:
135
                if isinstance(result, pandas.DataFrame):
1✔
136
                    result = result.to_dict(orient='list')
×
137
            self.write(simplejson.dumps(result, ignore_nan=True))
1✔
138
        else:
139
            self.write("null")
1✔
140
        self.finish()
1✔
141

142
    def get_arrow_data(self, filename):
1✔
143
        descriptor = pyarrow.flight.FlightDescriptor.for_path(filename)
×
144
        info = self.arrow_server.get_flight_info(None, descriptor)
×
145
        for endpoint in info.endpoints:
×
146
            for location in endpoint.locations:
×
147
                key = (descriptor.descriptor_type.value, descriptor.command,
×
148
                       tuple(descriptor.path or tuple()))
149
                df = self.arrow_server.flights.pop(key).to_pandas()
×
150
                return df
×
151
        print('no data found for get')
×
152
        return ''
×
153

154
    def upload_arrow_data(self, data, filename, metadata):
1✔
155
        my_table = pyarrow.table(data)
×
156
        if metadata is not None:
×
157
            my_table.schema.with_metadata(metadata)
×
158
        descriptor = pyarrow.flight.FlightDescriptor.for_path(filename)
×
159
        key = (descriptor.descriptor_type.value, descriptor.command,
×
160
                tuple(descriptor.path or tuple()))
161
        self.arrow_server.flights[key] = my_table
×
162

163
    @gen.coroutine
1✔
164
    def post(self):
165
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
1✔
166
            self.fail_with_auth_error()
1✔
167
            return
1✔
168

169
        self._add_CORS_header()
1✔
170
        try:
1✔
171
            yield self._post_impl()
1✔
172
        except Exception as e:
1✔
173
            import traceback
1✔
174
            print(traceback.format_exc())
1✔
175
            err_msg = f"{e.__class__.__name__} : {str(e)}"
1✔
176
            if err_msg != "KeyError : 'response'":
1✔
177
                err_msg = format_exception(e, "POST /evaluate")
1✔
178
                self.error_out(500, "Error processing script", info=err_msg)
1✔
179
            else:
180
                self.error_out(
×
181
                    404,
182
                    "Error processing script",
183
                    info="The endpoint you're "
184
                    "trying to query did not respond. Please make sure the "
185
                    "endpoint exists and the correct set of arguments are "
186
                    "provided.",
187
                )
188

189
    @gen.coroutine
1✔
190
    def _call_subprocess(self, function_to_evaluate, arguments):
191
        restricted_tabpy = RestrictedTabPy(
1✔
192
            self.protocol, self.port, self.logger, self.eval_timeout, self.request.headers
193
        )
194
        # Exec does not run the function, so it does not block.
195
        exec(function_to_evaluate, globals())
1✔
196

197
        # 'noqa' comments below tell flake8 to ignore undefined _user_script
198
        # name - the name is actually defined with user script being wrapped
199
        # in _user_script function (constructed as a striong) and then executed
200
        # with exec() call above.
201
        future = self.executor.submit(_user_script,  # noqa: F821
1✔
202
                                      restricted_tabpy,
203
                                      **arguments if arguments is not None else None)
204

205
        ret = yield gen.with_timeout(timedelta(seconds=self.eval_timeout), future)
1✔
206
        raise gen.Return(ret)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc