• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MerginMaps / mergin-py-client / 6545729027

17 Oct 2023 10:12AM UTC coverage: 77.141% (+0.08%) from 77.06%
6545729027

push

github

wonder-sk
address review

5 of 5 new or added lines in 1 file covered. (100.0%)

2747 of 3561 relevant lines covered (77.14%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.88
/mergin/client_push.py
1
"""
2
To push projects asynchronously. Start push: (does not block)
3

4
job = push_project_async(mergin_client, '/tmp/my_project')
5

6
Then we need to wait until we are finished uploading - either by periodically
7
calling push_project_is_running(job) that will just return True/False or by calling
8
push_project_wait(job) that will block the current thread (not good for GUI).
9
To finish the upload job, we have to call push_project_finalize(job).
10
"""
11

12
import json
1✔
13
import hashlib
1✔
14
import pprint
1✔
15
import tempfile
1✔
16
import concurrent.futures
1✔
17
import os
1✔
18

19
from .common import UPLOAD_CHUNK_SIZE, ClientError
1✔
20
from .merginproject import MerginProject
1✔
21

22

23
class UploadJob:
1✔
24
    """Keeps all the important data about a pending upload job"""
25

26
    def __init__(self, project_path, changes, transaction_id, mp, mc, tmp_dir):
1✔
27
        self.project_path = project_path  # full project name ("username/projectname")
1✔
28
        self.changes = changes  # dictionary of local changes to the project
1✔
29
        self.transaction_id = transaction_id  # ID of the transaction assigned by the server
1✔
30
        self.total_size = 0  # size of data to upload (in bytes)
1✔
31
        self.transferred_size = 0  # size of data already uploaded (in bytes)
1✔
32
        self.upload_queue_items = []  # list of items to upload in the background
1✔
33
        self.mp = mp  # MerginProject instance
1✔
34
        self.mc = mc  # MerginClient instance
1✔
35
        self.tmp_dir = tmp_dir  # TemporaryDirectory instance for any temp file we need
1✔
36
        self.is_cancelled = False  # whether upload has been cancelled
1✔
37
        self.executor = None  # ThreadPoolExecutor that manages background upload tasks
1✔
38
        self.futures = []  # list of futures submitted to the executor
1✔
39
        self.server_resp = None  # server response when transaction is finished
1✔
40

41
    def dump(self):
1✔
42
        print("--- JOB ---", self.total_size, "bytes")
×
43
        for item in self.upload_queue_items:
×
44
            print("- {} {} {}".format(item.file_path, item.chunk_index, item.size))
×
45
        print("--- END ---")
×
46

47

48
class UploadQueueItem:
1✔
49
    """A single chunk of data that needs to be uploaded"""
50

51
    def __init__(self, file_path, size, transaction_id, chunk_id, chunk_index):
1✔
52
        self.file_path = file_path  # full path to the file
1✔
53
        self.size = size  # size of the chunk in bytes
1✔
54
        self.chunk_id = chunk_id  # ID of the chunk within transaction
1✔
55
        self.chunk_index = chunk_index  # index (starting from zero) of the chunk within the file
1✔
56
        self.transaction_id = transaction_id  # ID of the transaction
1✔
57

58
    def upload_blocking(self, mc, mp):
1✔
59
        with open(self.file_path, "rb") as file_handle:
1✔
60
            file_handle.seek(self.chunk_index * UPLOAD_CHUNK_SIZE)
1✔
61
            data = file_handle.read(UPLOAD_CHUNK_SIZE)
1✔
62

63
            checksum = hashlib.sha1()
1✔
64
            checksum.update(data)
1✔
65

66
            mp.log.debug(f"Uploading {self.file_path} part={self.chunk_index}")
1✔
67

68
            headers = {"Content-Type": "application/octet-stream"}
1✔
69
            resp = mc.post("/v1/project/push/chunk/{}/{}".format(self.transaction_id, self.chunk_id), data, headers)
1✔
70
            resp_dict = json.load(resp)
1✔
71
            mp.log.debug(f"Upload finished: {self.file_path}")
1✔
72
            if not (resp_dict["size"] == len(data) and resp_dict["checksum"] == checksum.hexdigest()):
1✔
73
                try:
×
74
                    mc.post("/v1/project/push/cancel/{}".format(self.transaction_id))
×
75
                except ClientError:
×
76
                    pass
×
77
                raise ClientError("Mismatch between uploaded file chunk {} and local one".format(self.chunk_id))
×
78

79

80
def push_project_async(mc, directory):
1✔
81
    """Starts push of a project and returns pending upload job"""
82

83
    mp = MerginProject(directory)
1✔
84
    if mp.has_unfinished_pull():
1✔
85
        raise ClientError("Project is in unfinished pull state. Please resolve unfinished pull and try again.")
1✔
86

87
    project_path = mp.project_full_name()
1✔
88
    local_version = mp.version()
1✔
89

90
    mp.log.info("--- version: " + mc.user_agent_info())
1✔
91
    mp.log.info(f"--- start push {project_path}")
1✔
92

93
    try:
1✔
94
        server_info = mc.project_info(project_path)
1✔
95
    except ClientError as err:
×
96
        mp.log.error("Error getting project info: " + str(err))
×
97
        mp.log.info("--- push aborted")
×
98
        raise
×
99
    server_version = server_info["version"] if server_info["version"] else "v0"
1✔
100

101
    mp.log.info(f"got project info: local version {local_version} / server version {server_version}")
1✔
102

103
    username = mc.username()
1✔
104
    # permissions field contains information about update, delete and upload privileges of the user
105
    # on a specific project. This is more accurate information then "writernames" field, as it takes
106
    # into account namespace privileges. So we have to check only "permissions", namely "upload" one
107
    if not mc.has_writing_permissions(project_path):
1✔
108
        mp.log.error(f"--- push {project_path} - username {username} does not have write access")
×
109
        raise ClientError(f"You do not seem to have write access to the project (username '{username}')")
×
110

111
    if local_version != server_version:
1✔
112
        mp.log.error(f"--- push {project_path} - not up to date (local {local_version} vs server {server_version})")
1✔
113
        raise ClientError(
1✔
114
            "There is a new version of the project on the server. Please update your local copy."
115
            + f"\n\nLocal version: {local_version}\nServer version: {server_version}"
116
        )
117

118
    changes = mp.get_push_changes()
1✔
119
    mp.log.debug("push changes:\n" + pprint.pformat(changes))
1✔
120

121
    tmp_dir = tempfile.TemporaryDirectory(prefix="mergin-py-client-")
1✔
122

123
    # If there are any versioned files (aka .gpkg) that are not updated through a diff,
124
    # we need to make a temporary copy somewhere to be sure that we are uploading full content.
125
    # That's because if there are pending transactions, checkpointing or switching from WAL mode
126
    # won't work, and we would end up with some changes left in -wal file which do not get
127
    # uploaded. The temporary copy using geodiff uses sqlite backup API and should copy everything.
128
    for f in changes["updated"]:
1✔
129
        if mp.is_versioned_file(f["path"]) and "diff" not in f:
1✔
130
            mp.copy_versioned_file_for_upload(f, tmp_dir.name)
1✔
131

132
    for f in changes["added"]:
1✔
133
        if mp.is_versioned_file(f["path"]):
1✔
134
            mp.copy_versioned_file_for_upload(f, tmp_dir.name)
1✔
135

136
    if not sum(len(v) for v in changes.values()):
1✔
137
        mp.log.info(f"--- push {project_path} - nothing to do")
1✔
138
        return
1✔
139

140
    # drop internal info from being sent to server
141
    for item in changes["updated"]:
1✔
142
        item.pop("origin_checksum", None)
1✔
143
    data = {"version": local_version, "changes": changes}
1✔
144

145
    try:
1✔
146
        resp = mc.post(f"/v1/project/push/{project_path}", data, {"Content-Type": "application/json"})
1✔
147
    except ClientError as err:
1✔
148
        mp.log.error("Error starting transaction: " + str(err))
1✔
149
        mp.log.info("--- push aborted")
1✔
150
        raise
1✔
151
    server_resp = json.load(resp)
1✔
152

153
    upload_files = data["changes"]["added"] + data["changes"]["updated"]
1✔
154

155
    transaction_id = server_resp["transaction"] if upload_files else None
1✔
156
    job = UploadJob(project_path, changes, transaction_id, mp, mc, tmp_dir)
1✔
157

158
    if not upload_files:
1✔
159
        mp.log.info("not uploading any files")
1✔
160
        job.server_resp = server_resp
1✔
161
        push_project_finalize(job)
1✔
162
        return None  # all done - no pending job
1✔
163

164
    mp.log.info(f"got transaction ID {transaction_id}")
1✔
165

166
    upload_queue_items = []
1✔
167
    total_size = 0
1✔
168
    # prepare file chunks for upload
169
    for file in upload_files:
1✔
170
        if "diff" in file:
1✔
171
            # versioned file - uploading diff
172
            file_location = mp.fpath_meta(file["diff"]["path"])
1✔
173
            file_size = file["diff"]["size"]
1✔
174
        elif "upload_file" in file:
1✔
175
            # versioned file - uploading full (a temporary copy)
176
            file_location = file["upload_file"]
1✔
177
            file_size = file["size"]
1✔
178
        else:
179
            # non-versioned file
180
            file_location = mp.fpath(file["path"])
1✔
181
            file_size = file["size"]
1✔
182

183
        for chunk_index, chunk_id in enumerate(file["chunks"]):
1✔
184
            size = min(UPLOAD_CHUNK_SIZE, file_size - chunk_index * UPLOAD_CHUNK_SIZE)
1✔
185
            upload_queue_items.append(UploadQueueItem(file_location, size, transaction_id, chunk_id, chunk_index))
1✔
186

187
        total_size += file_size
1✔
188

189
    job.total_size = total_size
1✔
190
    job.upload_queue_items = upload_queue_items
1✔
191

192
    mp.log.info(f"will upload {len(upload_queue_items)} items with total size {total_size}")
1✔
193

194
    # start uploads in background
195
    job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
1✔
196
    for item in upload_queue_items:
1✔
197
        future = job.executor.submit(_do_upload, item, job)
1✔
198
        job.futures.append(future)
1✔
199

200
    return job
1✔
201

202

203
def push_project_wait(job):
1✔
204
    """blocks until all upload tasks are finished"""
205

206
    concurrent.futures.wait(job.futures)
1✔
207

208

209
def push_project_is_running(job):
1✔
210
    """
211
    Returns true/false depending on whether we have some pending uploads
212

213
    It also forwards any exceptions from workers (e.g. some network errors). If an exception
214
    is raised, it is advised to call push_project_cancel() to abort the job.
215
    """
216
    for future in job.futures:
×
217
        if future.done() and future.exception() is not None:
×
218
            job.mp.log.error("Error while pushing data: " + str(future.exception()))
×
219
            job.mp.log.info("--- push aborted")
×
220
            raise future.exception()
×
221
        if future.running():
×
222
            return True
×
223
    return False
×
224

225

226
def push_project_finalize(job):
1✔
227
    """
228
    To be called when push in the background is finished and we need to do the finalization
229

230
    This should not be called from a worker thread (e.g. directly from a handler when push is complete).
231

232
    If any of the workers has thrown any exception, it will be re-raised (e.g. some network errors).
233
    That also means that the whole job has been aborted.
234
    """
235

236
    with_upload_of_files = job.executor is not None
1✔
237

238
    if with_upload_of_files:
1✔
239
        job.executor.shutdown(wait=True)
1✔
240

241
        # make sure any exceptions from threads are not lost
242
        for future in job.futures:
1✔
243
            if future.exception() is not None:
1✔
244
                job.mp.log.error("Error while pushing data: " + str(future.exception()))
×
245
                job.mp.log.info("--- push aborted")
×
246
                raise future.exception()
×
247

248
    if job.transferred_size != job.total_size:
1✔
249
        error_msg = "Transferred size ({}) and expected total size ({}) do not match!".format(
×
250
            job.transferred_size, job.total_size
251
        )
252
        job.mp.log.error("--- push finish failed! " + error_msg)
×
253
        raise ClientError("Upload error: " + error_msg)
×
254

255
    if with_upload_of_files:
1✔
256
        try:
1✔
257
            job.mp.log.info(f"Finishing transaction {job.transaction_id}")
1✔
258
            resp = job.mc.post("/v1/project/push/finish/%s" % job.transaction_id)
1✔
259
            job.server_resp = json.load(resp)
1✔
260
        except ClientError as err:
×
261
            # server returns various error messages with filename or something generic
262
            # it would be better if it returned list of failed files (and reasons) whenever possible
263
            job.mp.log.error("--- push finish failed! " + str(err))
×
264

265
            # if push finish fails, the transaction is not killed, so we
266
            # need to cancel it so it does not block further uploads
267
            job.mp.log.info("canceling the pending transaction...")
×
268
            try:
×
269
                resp_cancel = job.mc.post("/v1/project/push/cancel/%s" % job.transaction_id)
×
270
                job.mp.log.info("cancel response: " + resp_cancel.msg)
×
271
            except ClientError as err2:
×
272
                job.mp.log.info("cancel response: " + str(err2))
×
273
            raise err
×
274

275
    job.mp.update_metadata(job.server_resp)
1✔
276
    try:
1✔
277
        job.mp.apply_push_changes(job.changes)
1✔
278
    except Exception as e:
×
279
        job.mp.log.error("Failed to apply push changes: " + str(e))
×
280
        job.mp.log.info("--- push aborted")
×
281
        raise ClientError("Failed to apply push changes: " + str(e))
×
282

283
    job.tmp_dir.cleanup()  # delete our temporary dir and all its content
1✔
284

285
    remove_diff_files(job)
1✔
286

287
    job.mp.log.info("--- push finished - new project version " + job.server_resp["version"])
1✔
288

289

290
def push_project_cancel(job):
1✔
291
    """
292
    To be called (from main thread) to cancel a job that has uploads in progress.
293
    Returns once all background tasks have exited (may block for a bit of time).
294
    """
295

296
    job.mp.log.info("user cancelled the push...")
1✔
297
    # set job as cancelled
298
    job.is_cancelled = True
1✔
299

300
    job.executor.shutdown(wait=True)
1✔
301
    try:
1✔
302
        resp_cancel = job.mc.post("/v1/project/push/cancel/%s" % job.transaction_id)
1✔
303
        job.server_resp = resp_cancel.msg
1✔
304
    except ClientError as err:
×
305
        job.mp.log.error("--- push cancelling failed! " + str(err))
×
306
        raise err
×
307
    job.mp.log.info("--- push cancel response: " + str(job.server_resp))
1✔
308

309

310
def _do_upload(item, job):
1✔
311
    """runs in worker thread"""
312
    if job.is_cancelled:
1✔
313
        return
×
314

315
    item.upload_blocking(job.mc, job.mp)
1✔
316
    job.transferred_size += item.size
1✔
317

318

319
def remove_diff_files(job) -> None:
1✔
320
    """Looks for diff files in the job and removes them."""
321

322
    for change in job.changes["updated"]:
1✔
323
        if "diff" in change.keys():
1✔
324
            diff_file = job.mp.fpath_meta(change["diff"]["path"])
1✔
325
            if os.path.exists(diff_file):
1✔
326
                os.remove(diff_file)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc