• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / cb3ae6f4-72a6-4841-bd9c-41168249ca4d

pending completion
cb3ae6f4-72a6-4841-bd9c-41168249ca4d

push

circle-ci

GitHub
docs: update links (#577)

8149 of 8738 relevant lines covered (93.26%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.67
/eva/server/db_api.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import asyncio
1✔
16
import base64
1✔
17
import os
1✔
18
import random
1✔
19

20
from eva.models.server.response import Response
1✔
21
from eva.server.async_protocol import EvaClient
1✔
22

23

24
class EVAConnection:
1✔
25
    def __init__(self, transport, protocol):
1✔
26
        self._transport = transport
1✔
27
        self._protocol = protocol
1✔
28
        self._cursor = None
1✔
29

30
    def cursor(self):
1✔
31
        # One unique cursor for one connection
32
        if self._cursor is None:
1✔
33
            self._cursor = EVACursor(self)
1✔
34
        return self._cursor
1✔
35

36
    @property
1✔
37
    def protocol(self):
1✔
38
        return self._protocol
1✔
39

40

41
class EVACursor(object):
1✔
42
    def __init__(self, connection):
1✔
43
        self._connection = connection
1✔
44
        self._pending_query = False
1✔
45

46
    @property
1✔
47
    def connection(self):
1✔
48
        return self._connection
1✔
49

50
    async def execute_async(self, query: str):
1✔
51
        """
52
        Send query to the EVA server.
53
        """
54
        if self._pending_query:
1✔
55
            raise SystemError(
1✔
56
                "EVA does not support concurrent queries. \
57
                    Call fetch_all() to complete the pending query"
58
            )
59
        query = self._upload_transformation(query)
1✔
60
        await self.connection.protocol.send_message(query)
1✔
61
        self._pending_query = True
1✔
62

63
    async def fetch_one_async(self) -> Response:
1✔
64
        """
65
        fetch_one returns one batch instead of one row for now.
66
        """
67
        try:
1✔
68
            message = await self.connection.protocol.queue.get()
1✔
69
            response = await asyncio.coroutine(Response.deserialize)(message)
1✔
70
        except Exception as e:
×
71
            raise e
×
72
        self._pending_query = False
1✔
73
        return response
1✔
74

75
    async def fetch_all_async(self) -> Response:
1✔
76
        """
77
        fetch_all is the same as fetch_one for now.
78
        """
79
        return await self.fetch_one_async()
×
80

81
    def _upload_transformation(self, query: str) -> str:
1✔
82
        """
83
        Special case:
84
         - UPLOAD: the client read the file and uses base64 to encode
85
         the content into a string.
86
        """
87
        if "UPLOAD" in query:
1✔
88
            query_list = query.split()
1✔
89
            file_path = query_list[2][1:-1]
1✔
90
            dst_path = os.path.basename(file_path)
1✔
91

92
            try:
1✔
93
                with open(file_path, "rb") as f:
1✔
94
                    bytes_read = f.read()
1✔
95
                    b64_string = str(base64.b64encode(bytes_read))
1✔
96
                    query = f"UPLOAD PATH '{dst_path}' BLOB \"{b64_string}\""
1✔
97

98
                    for token in query_list[3:]:
1✔
99
                        query += token + " "
1✔
100
            except Exception as e:
1✔
101
                raise e
1✔
102

103
        return query
1✔
104

105
    def __getattr__(self, name):
1✔
106
        """
107
        Auto generate sync function calls from async
108
        Sync function calls should not be used in an async environment.
109
        """
110
        func = object.__getattribute__(self, "%s_async" % name)
1✔
111
        if not asyncio.iscoroutinefunction(func):
1✔
112
            raise AttributeError
×
113

114
        def func_sync(*args, **kwargs):
1✔
115
            loop = asyncio.get_event_loop()
1✔
116
            res = loop.run_until_complete(func(*args, **kwargs))
1✔
117
            return res
1✔
118

119
        return func_sync
1✔
120

121

122
async def connect_async(host: str, port: int, max_retry_count: int = 3):
1✔
123
    loop = asyncio.get_event_loop()
1✔
124

125
    retries = max_retry_count * [1]
1✔
126

127
    while True:
128
        try:
1✔
129
            transport, protocol = await loop.create_connection(
1✔
130
                lambda: EvaClient(), host, port
131
            )
132

133
        except Exception as e:
1✔
134
            if not retries:
1✔
135
                raise e
1✔
136
            await asyncio.sleep(retries.pop(0) - random.random())
1✔
137
        else:
138
            break
×
139

140
    return EVAConnection(transport, protocol)
1✔
141

142

143
def connect(host: str, port: int, max_retry_count: int = 3):
1✔
144
    loop = asyncio.get_event_loop()
1✔
145
    return loop.run_until_complete(connect_async(host, port, max_retry_count))
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc