• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / cf748690-046a-4f45-983b-b5ca63416eda

pending completion
cf748690-046a-4f45-983b-b5ca63416eda

Pull #582

circle-ci

jarulraj
checkpoint
Pull Request #582: server: asyncio refactoring

150 of 150 new or added lines in 7 files covered. (100.0%)

7887 of 8618 relevant lines covered (91.52%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.46
/eva/server/db_api.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import asyncio
1✔
16
import base64
1✔
17
import os
1✔
18

19
from eva.models.server.response import Response
1✔
20
from eva.utils.logging_manager import logger
1✔
21

22

23
class EVAConnection:
1✔
24
    def __init__(self, reader, writer):
1✔
25
        self._reader = reader
×
26
        self._writer = writer
×
27
        self._cursor = None
×
28

29
    def cursor(self):
1✔
30
        # One unique cursor for one connection
31
        if self._cursor is None:
×
32
            self._cursor = EVACursor(self)
×
33
        return self._cursor
×
34

35

36
class EVACursor(object):
1✔
37
    def __init__(self, connection):
1✔
38
        self._connection = connection
×
39
        self._pending_query = False
×
40

41
    async def execute_async(self, query: str):
1✔
42
        """
43
        Send query to the EVA server.
44
        """
45
        if self._pending_query:
×
46
            raise SystemError(
×
47
                "EVA does not support concurrent queries. \
48
                    Call fetch_all() to complete the pending query"
49
            )
50
        query = self._multiline_query_transformation(query)
×
51
        query = self._upload_transformation(query)
×
52
        self._connection._writer.write((query + "\n").encode())
×
53
        await self._connection._writer.drain()
×
54
        self._pending_query = True
×
55

56
    async def fetch_one_async(self) -> Response:
1✔
57
        """
58
        fetch_one returns one batch instead of one row for now.
59
        """
60
        response = Response()
×
61
        try:
×
62
            prefix = await self._connection._reader.readline()
×
63
            if prefix != b"":
×
64
                message_length = int(prefix)
×
65
                message = await self._connection._reader.readexactly(message_length)
×
66
                response = Response.deserialize(message)
×
67
        except Exception as e:
×
68
            raise e
×
69
        self._pending_query = False
×
70
        return response
×
71

72
    async def fetch_all_async(self) -> Response:
1✔
73
        """
74
        fetch_all is the same as fetch_one for now.
75
        """
76
        return await self.fetch_one_async()
×
77

78
    def _multiline_query_transformation(self, query: str) -> str:
1✔
79
        query = query.replace("\n", " ")
×
80
        query = query.lstrip()
×
81
        query = query.rstrip(" ;")
×
82
        query += ";"
×
83
        logger.debug("Query: " + query)
×
84
        return query
×
85

86
    def _upload_transformation(self, query: str) -> str:
1✔
87
        """
88
        Special case:
89
         - UPLOAD: the client read the file and uses base64 to encode
90
         the content into a string.
91
        """
92
        if "UPLOAD" in query:
×
93
            query_list = query.split()
×
94
            file_path = query_list[2][1:-1]
×
95
            dst_path = os.path.basename(file_path)
×
96

97
            try:
×
98
                with open(file_path, "rb") as f:
×
99
                    bytes_read = f.read()
×
100
                    b64_string = str(base64.b64encode(bytes_read))
×
101
                    query = f"UPLOAD PATH '{dst_path}' BLOB \"{b64_string}\""
×
102

103
                    for token in query_list[3:]:
×
104
                        query += token + " "
×
105
            except Exception as e:
×
106
                raise e
×
107

108
        return query
×
109

110
    def stop_query(self):
1✔
111
        self._pending_query = False
×
112

113
    def __getattr__(self, name):
1✔
114
        """
115
        Auto generate sync function calls from async
116
        Sync function calls should not be used in an async environment.
117
        """
118
        try:
×
119
            func = object.__getattribute__(self, "%s_async" % name)
×
120
        except Exception as e:
×
121
            raise e
×
122

123
        def func_sync(*args, **kwargs):
×
124
            loop = asyncio.get_event_loop()
×
125
            res = loop.run_until_complete(func(*args, **kwargs))
×
126
            return res
×
127

128
        return func_sync
×
129

130

131
async def get_connection(host: str, port: int) -> EVAConnection:
1✔
132
    reader, writer = await asyncio.open_connection(host, port)
×
133
    connection = EVAConnection(reader, writer)
×
134
    return connection
×
135

136

137
def connect(host: str, port: int) -> EVAConnection:
1✔
138
    connection = asyncio.run(get_connection(host, port))
×
139
    return connection
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc