• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tableau / TabPy / 7389403568

02 Jan 2024 06:49PM CUT coverage: 57.586%. Remained the same
7389403568

Pull #628

github

web-flow
Update server-install.md

Add 3.10 to list of supported python versions
Pull Request #628: Update server-install.md

1355 of 2353 relevant lines covered (57.59%)

0.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.8
/tabpy/tabpy_server/psws/python_service.py
1
import concurrent.futures
1✔
2
import logging
1✔
3
from tabpy.tabpy_server.common.util import format_exception
1✔
4
from tabpy.tabpy_server.common.messages import (
1✔
5
    LoadObject,
6
    DeleteObjects,
7
    FlushObjects,
8
    CountObjects,
9
    ListObjects,
10
    UnknownMessage,
11
    LoadFailed,
12
    ObjectsDeleted,
13
    ObjectsFlushed,
14
    QueryFailed,
15
    QuerySuccessful,
16
    UnknownURI,
17
    DownloadSkipped,
18
    LoadInProgress,
19
    ObjectCount,
20
    ObjectList,
21
)
22
from tabpy.tabpy_tools.query_object import QueryObject
1✔
23

24

25
logger = logging.getLogger(__name__)
1✔
26

27

28
class PythonServiceHandler:
1✔
29
    """
30
    A wrapper around PythonService object that receives requests and calls the
31
    corresponding methods.
32
    """
33

34
    def __init__(self, ps):
1✔
35
        self.ps = ps
1✔
36

37
    def manage_request(self, msg):
1✔
38
        try:
×
39
            logger.debug(f"Received request {type(msg).__name__}")
×
40
            if isinstance(msg, LoadObject):
×
41
                response = self.ps.load_object(*msg)
×
42
            elif isinstance(msg, DeleteObjects):
×
43
                response = self.ps.delete_objects(msg.uris)
×
44
            elif isinstance(msg, FlushObjects):
×
45
                response = self.ps.flush_objects()
×
46
            elif isinstance(msg, CountObjects):
×
47
                response = self.ps.count_objects()
×
48
            elif isinstance(msg, ListObjects):
×
49
                response = self.ps.list_objects()
×
50
            else:
51
                response = UnknownMessage(msg)
×
52

53
            logger.debug(f"Returning response {response}")
×
54
            return response
×
55
        except Exception as e:
×
56
            logger.exception(e)
×
57
            msg = e
×
58
            if hasattr(e, "message"):
×
59
                msg = e.message
×
60
            logger.error(f"Error processing request: {msg}")
×
61
            return UnknownMessage(msg)
×
62

63

64
class PythonService:
1✔
65
    """
66
    This class is a simple wrapper maintaining loaded query objects from
67
    the current TabPy instance. `query_objects` is a dictionary that
68
    maps query object URI to query objects
69

70
    The query_objects schema is as follow:
71

72
    {'version': <current-successfuly-loaded-version>,
73
     'last_error':<your-recent-error-to-load-model>,
74
     'endpoint_obj':<loaded_query_objects>,
75
     'type':<object-type>,
76
     'status':<LoadSuccessful-or-LoadFailed-or-LoadInProgress>}
77

78
    """
79

80
    def __init__(self, query_objects=None):
1✔
81

82
        self.EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=1)
1✔
83
        self.query_objects = query_objects or {}
1✔
84

85
    def _load_object(
1✔
86
        self, object_uri, object_url, object_version, is_update, object_type
87
    ):
88
        try:
×
89
            logger.info(
×
90
                f"Loading object:, URI={object_uri}, "
91
                f"URL={object_url}, version={object_version}, "
92
                f"is_updated={is_update}"
93
            )
94
            if object_type == "model":
×
95
                po = QueryObject.load(object_url)
×
96
            elif object_type == "alias":
×
97
                po = object_url
×
98
            else:
99
                raise RuntimeError(f"Unknown object type: {object_type}")
×
100

101
            self.query_objects[object_uri] = {
×
102
                "version": object_version,
103
                "type": object_type,
104
                "endpoint_obj": po,
105
                "status": "LoadSuccessful",
106
                "last_error": None,
107
            }
108
        except Exception as e:
×
109
            logger.exception(e)
×
110
            logger.error(
×
111
                f"Unable to load QueryObject: path={object_url}, " f"error={str(e)}"
112
            )
113

114
            self.query_objects[object_uri] = {
×
115
                "version": object_version,
116
                "type": object_type,
117
                "endpoint_obj": None,
118
                "status": "LoadFailed",
119
                "last_error": f"Load failed: {str(e)}",
120
            }
121

122
    def load_object(
1✔
123
        self, object_uri, object_url, object_version, is_update, object_type
124
    ):
125
        try:
×
126
            obj_info = self.query_objects.get(object_uri)
×
127
            if (
×
128
                obj_info
129
                and obj_info["endpoint_obj"]
130
                and (obj_info["version"] >= object_version)
131
            ):
132
                logger.info("Received load message for object already loaded")
×
133

134
                return DownloadSkipped(
×
135
                    object_uri,
136
                    obj_info["version"],
137
                    "Object with greater " "or equal version already loaded",
138
                )
139
            else:
140
                if object_uri not in self.query_objects:
×
141
                    self.query_objects[object_uri] = {
×
142
                        "version": object_version,
143
                        "type": object_type,
144
                        "endpoint_obj": None,
145
                        "status": "LoadInProgress",
146
                        "last_error": None,
147
                    }
148
                else:
149
                    self.query_objects[object_uri]["status"] = "LoadInProgress"
×
150

151
                self.EXECUTOR.submit(
×
152
                    self._load_object,
153
                    object_uri,
154
                    object_url,
155
                    object_version,
156
                    is_update,
157
                    object_type,
158
                )
159

160
                return LoadInProgress(
×
161
                    object_uri, object_url, object_version, is_update, object_type
162
                )
163
        except Exception as e:
×
164
            logger.exception(e)
×
165
            logger.error(
×
166
                f"Unable to load QueryObject: path={object_url}, " f"error={str(e)}"
167
            )
168

169
            self.query_objects[object_uri] = {
×
170
                "version": object_version,
171
                "type": object_type,
172
                "endpoint_obj": None,
173
                "status": "LoadFailed",
174
                "last_error": str(e),
175
            }
176

177
            return LoadFailed(object_uri, object_version, str(e))
×
178

179
    def delete_objects(self, object_uris):
1✔
180
        """Delete one or more objects from the query_objects map"""
181
        if isinstance(object_uris, list):
×
182
            deleted = []
×
183
            for uri in object_uris:
×
184
                deleted.extend(self.delete_objects(uri).uris)
×
185
            return ObjectsDeleted(deleted)
×
186
        elif isinstance(object_uris, str):
×
187
            deleted_obj = self.query_objects.pop(object_uris, None)
×
188
            if deleted_obj:
×
189
                return ObjectsDeleted([object_uris])
×
190
            else:
191
                logger.warning(
×
192
                    f"Received message to delete query object "
193
                    f"that doesn't exist: "
194
                    f"object_uris={object_uris}"
195
                )
196
                return ObjectsDeleted([])
×
197
        else:
198
            logger.error(
×
199
                f"Unexpected input to delete objects: input={object_uris}, "
200
                f'info="Input should be list or str. '
201
                f'Type: {type(object_uris)}"'
202
            )
203
            return ObjectsDeleted([])
×
204

205
    def flush_objects(self):
1✔
206
        """Flush objects from the query_objects map"""
207
        logger.debug("Flushing query objects")
×
208
        n = len(self.query_objects)
×
209
        self.query_objects.clear()
×
210
        return ObjectsFlushed(n, 0)
×
211

212
    def count_objects(self):
1✔
213
        """Count the number of Loaded QueryObjects stored in memory"""
214
        count = 0
×
215
        for uri, po in self.query_objects.items():
×
216
            if po["endpoint_obj"] is not None:
×
217
                count += 1
×
218
        return ObjectCount(count)
×
219

220
    def list_objects(self):
1✔
221
        """List the objects as (URI, version) pairs"""
222

223
        objects = {}
×
224
        for (uri, obj_info) in self.query_objects.items():
×
225
            objects[uri] = {
×
226
                "version": obj_info["version"],
227
                "type": obj_info["type"],
228
                "status": obj_info["status"],
229
                "reason": obj_info["last_error"],
230
            }
231

232
        return ObjectList(objects)
×
233

234
    def query(self, object_uri, params, uid):
1✔
235
        """Execute a QueryObject query"""
236
        logger.debug(f"Querying Python service {object_uri}...")
×
237
        try:
×
238
            if not isinstance(params, dict) and not isinstance(params, list):
×
239
                return QueryFailed(
×
240
                    uri=object_uri,
241
                    error=(
242
                        "Query parameter needs to be a dictionary or a list"
243
                        f". Given value is of type {type(params)}"
244
                    ),
245
                )
246

247
            obj_info = self.query_objects.get(object_uri)
×
248
            logger.debug(f"Found object {obj_info}")
×
249
            if obj_info:
×
250
                pred_obj = obj_info["endpoint_obj"]
×
251
                version = obj_info["version"]
×
252

253
                if not pred_obj:
×
254
                    return QueryFailed(
×
255
                        uri=object_uri,
256
                        error=(
257
                            "There is no query object associated to the "
258
                            f"endpoint: {object_uri}"
259
                        ),
260
                    )
261

262
                logger.debug(f"Querying endpoint with params ({params})...")
×
263
                if isinstance(params, dict):
×
264
                    result = pred_obj.query(**params)
×
265
                else:
266
                    result = pred_obj.query(*params)
×
267

268
                return QuerySuccessful(object_uri, version, result)
×
269
            else:
270
                return UnknownURI(object_uri)
×
271
        except Exception as e:
×
272
            logger.exception(e)
×
273
            err_msg = format_exception(e, "/query")
×
274
            logger.error(err_msg)
×
275
            return QueryFailed(uri=object_uri, error=err_msg)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc