• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

paticcaa / pain / 17488410533

05 Sep 2025 08:47AM UTC coverage: 53.819% (-0.8%) from 54.596%
17488410533

push

github

ivanallen
register manusya to deva

Signed-off-by: allen <1007729991@qq.com>

1637 of 4154 branches covered (39.41%)

Branch coverage included in aggregate %.

108 of 140 new or added lines in 4 files covered. (77.14%)

2 existing lines in 1 file now uncovered.

1865 of 2353 relevant lines covered (79.26%)

401.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.4
/src/deva/deva.cc
1
#include "deva/deva.h"
2
#include <pain/base/plog.h>
3
#include <pain/base/uuid.h>
4
#include "common/txn_manager.h"
5
#include "deva/macro.h"
6

7
#define DEVA_METHOD(name)                                                                                              \
8
    Status Deva::name([[maybe_unused]] int32_t version,                                                                \
9
                      [[maybe_unused]] const pain::proto::deva::store::name##Request* request,                         \
10
                      [[maybe_unused]] pain::proto::deva::store::name##Response* response,                             \
11
                      [[maybe_unused]] int64_t index)
12

13
namespace pain::deva {
14

15
Status Deva::create(const std::string& path, const UUID& id, FileType type) {
74✔
16
    SPAN(span);
370!
17
    PLOG_DEBUG(("desc", "create")("path", path)("id", id.str())("type", type));
74✔
18
    // get parent
19
    std::filesystem::path file_path(path);
74!
20
    auto dir = file_path.parent_path();
74!
21
    auto filename = file_path.filename();
74!
22
    auto file_type = FileType::kFile;
74✔
23
    UUID parent_dir_uuid;
74✔
24
    auto status = _namespace.lookup(dir.c_str(), &parent_dir_uuid, &file_type);
74!
25
    if (!status.ok()) {
74!
26
        return status;
×
27
    }
28

29
    if (file_type != FileType::kDirectory) {
74!
30
        return Status(EINVAL, fmt::format("{} is not a directory", dir.c_str()));
×
31
    }
32

33
    UUID file_uuid(id.high(), id.low());
74✔
34
    status = _namespace.create(parent_dir_uuid, filename, type, file_uuid);
74!
35
    if (!status.ok()) {
74!
36
        return status;
×
37
    }
38

39
    return Status::OK();
74✔
40
}
222✔
41

42
DEVA_METHOD(CreateFile) {
36✔
43
    SPAN(span);
180!
44
    PLOG_DEBUG(("desc", "create_file")("version", version)("index", index)("request", request->DebugString()));
36✔
45
    auto& path = request->path();
36!
46
    auto& file_id = request->file_id();
36!
47
    UUID file_uuid(file_id.high(), file_id.low());
36!
48
    auto status = create(path, file_uuid, FileType::kFile);
36!
49
    if (!status.ok()) {
36!
50
        return status;
×
51
    }
52
    auto file_info = response->mutable_file_info();
36!
53
    file_info->mutable_file_id()->set_high(file_uuid.high());
36!
54
    file_info->mutable_file_id()->set_low(file_uuid.low());
36!
55
    file_info->set_type(pain::proto::FileType::FILE_TYPE_FILE);
36!
56
    file_info->set_size(0);
36!
57
    file_info->set_atime(request->atime());
36!
58
    file_info->set_mtime(request->mtime());
36!
59
    file_info->set_ctime(request->ctime());
36!
60
    file_info->set_mode(request->mode());
36!
61
    file_info->set_uid(request->uid());
36!
62
    file_info->set_gid(request->gid());
36!
63
    status = update_file_info(file_uuid, *file_info);
36!
64
    if (!status.ok()) {
36!
NEW
65
        return status;
×
66
    }
67
    return Status::OK();
36✔
68
}
108✔
69

70
DEVA_METHOD(CreateDir) {
38✔
71
    SPAN(span);
190!
72
    PLOG_DEBUG(("desc", "create_dir")("version", version)("index", index)("request", request->DebugString()));
38✔
73
    auto& path = request->path();
38!
74
    auto& dir_id = request->dir_id();
38!
75
    UUID dir_uuid(dir_id.high(), dir_id.low());
38!
76
    auto status = create(path, dir_uuid, FileType::kDirectory);
38!
77
    if (!status.ok()) {
38!
78
        return status;
×
79
    }
80
    auto file_info = response->mutable_file_info();
38!
81
    file_info->mutable_file_id()->set_high(dir_uuid.high());
38!
82
    file_info->mutable_file_id()->set_low(dir_uuid.low());
38!
83
    file_info->set_type(pain::proto::FileType::FILE_TYPE_DIRECTORY);
38!
84
    file_info->set_size(0);
38!
85
    file_info->set_atime(request->atime());
38!
86
    file_info->set_mtime(request->mtime());
38!
87
    file_info->set_ctime(request->ctime());
38!
88
    file_info->set_mode(request->mode());
38!
89
    file_info->set_uid(request->uid());
38!
90
    file_info->set_gid(request->gid());
38!
91
    status = update_file_info(dir_uuid, *file_info);
38!
92
    if (!status.ok()) {
38!
NEW
93
        return status;
×
94
    }
95
    return Status::OK();
38✔
96
}
114✔
97

98
DEVA_METHOD(ReadDir) {
2✔
99
    SPAN(span);
10!
100
    PLOG_DEBUG(("desc", "read_dir")("version", version)("index", index)("request", request->DebugString()));
2✔
101
    auto& path = request->path();
2!
102
    UUID parent_dir_uuid;
2✔
103
    FileType file_type = FileType::kNone;
2✔
104
    auto status = _namespace.lookup(path.c_str(), &parent_dir_uuid, &file_type);
2!
105
    if (!status.ok()) {
2!
106
        return status;
×
107
    }
108
    if (file_type != FileType::kDirectory) {
2!
109
        return Status(EINVAL, fmt::format("{} is not a directory", path.c_str()));
×
110
    }
111
    std::list<DirEntry> entries;
2✔
112
    _namespace.list(parent_dir_uuid, &entries);
2!
113
    for (auto& entry : entries) {
23✔
114
        auto dir_entry = response->add_entries();
21!
115
        dir_entry->mutable_file_id()->set_high(entry.inode.high());
21!
116
        dir_entry->mutable_file_id()->set_low(entry.inode.low());
21!
117
        dir_entry->set_type(static_cast<pain::proto::FileType>(entry.type));
21!
118
        dir_entry->set_name(std::move(entry.name));
21✔
119
    }
120
    return Status::OK();
2✔
121
}
6✔
122

123
DEVA_METHOD(RemoveFile) {
×
124
    SPAN(span);
×
125
    PLOG_INFO(("desc", "remove_file")("version", version)("index", index));
×
126
    return Status::OK();
×
127
}
×
128

129
DEVA_METHOD(SealFile) {
×
130
    SPAN(span);
×
131
    PLOG_INFO(("desc", "seal_file")("version", version)("index", index));
×
132
    return Status::OK();
×
133
}
×
134

135
DEVA_METHOD(CreateChunk) {
×
136
    SPAN(span);
×
137
    PLOG_INFO(("desc", "remove")("version", version)("index", index));
×
138
    return Status::OK();
×
139
}
×
140

141
DEVA_METHOD(CheckInChunk) {
×
142
    SPAN(span);
×
143
    PLOG_INFO(("desc", "seal")("version", version)("index", index));
×
144
    return Status::OK();
×
145
}
×
146

147
DEVA_METHOD(SealChunk) {
×
148
    SPAN(span);
×
149
    PLOG_INFO(("desc", "create_chunk")("version", version)("index", index));
×
150
    return Status::OK();
×
151
}
×
152

153
DEVA_METHOD(SealAndNewChunk) {
×
154
    SPAN(span);
×
155
    PLOG_INFO(("desc", "remove_chunk")("version", version)("index", index));
×
156
    return Status::OK();
×
157
}
×
158

159
DEVA_METHOD(GetFileInfo) {
1✔
160
    SPAN(span);
5!
161
    PLOG_INFO(("desc", "get_file_info")("version", version)("index", index));
1✔
162
    auto& path = request->path();
1!
163
    UUID file_uuid;
1✔
164
    FileType file_type = FileType::kNone;
1✔
165
    auto status = _namespace.lookup(path.c_str(), &file_uuid, &file_type);
1!
166
    if (!status.ok()) {
1!
NEW
167
        return status;
×
168
    }
169
    if (file_type != FileType::kFile) {
1!
NEW
170
        return Status(EINVAL, fmt::format("{} is not a file", path.c_str()));
×
171
    }
172
    proto::FileInfo file_info;
1!
173
    status = get_file_info(file_uuid, &file_info);
1!
174
    if (!status.ok()) {
1!
NEW
175
        return status;
×
176
    }
177
    response->mutable_file_info()->Swap(&file_info);
1!
178
    return Status::OK();
1✔
179
}
3✔
180

181
DEVA_METHOD(ManusyaHeartbeat) {
1✔
182
    SPAN(span);
5!
183
    PLOG_INFO(("desc", "manusya_heartbeat")("version", version)("index", index));
1✔
184
    auto& manusya_id = request->manusya_registration().manusya_id();
1!
185
    // auto& storage_info = request->manusya_registration().storage_info();
186
    // TODO: check cluster id
187

188
    UUID manusya_uuid(manusya_id.uuid().high(), manusya_id.uuid().low());
1!
189
    auto it = _manusya_descriptors.find(manusya_uuid);
1!
190
    if (it == _manusya_descriptors.end()) {
1!
191
        // new manusya
192
        PLOG_INFO(("desc", "new manusya")              //
1✔
193
                  ("manusya_uuid", manusya_uuid.str()) //
194
                  ("ip", manusya_id.ip())              //
195
                  ("port", manusya_id.port()));
196
        ManusyaDescriptor manusya_descriptor;
1✔
197
        manusya_descriptor.ip = manusya_id.ip();
1!
198
        manusya_descriptor.port = manusya_id.port();
1!
199
        manusya_descriptor.uuid = manusya_uuid;
1✔
200
        manusya_descriptor.is_alive = true;
1✔
201
        manusya_descriptor.update_heartbeat();
1✔
202
        _manusya_descriptors[manusya_uuid] = manusya_descriptor;
1!
203
    } else {
1✔
NEW
204
        auto& manusya_descriptor = it->second;
×
NEW
205
        manusya_descriptor.ip = manusya_id.ip();
×
NEW
206
        manusya_descriptor.port = manusya_id.port();
×
NEW
207
        manusya_descriptor.update_heartbeat();
×
208
    }
209
    return Status::OK();
2✔
210
}
3✔
211

212
DEVA_METHOD(ListManusya) {
2✔
213
    SPAN(span);
10!
214
    PLOG_INFO(("desc", "list_manusya")("version", version)("index", index));
2✔
215
    auto manusya_descriptors = response->mutable_manusya_descriptors();
2!
216
    for (auto& [_, manusya_descriptor] : _manusya_descriptors) {
3✔
217
        auto manusya_descriptor_proto = manusya_descriptors->Add();
1!
218
        manusya_descriptor_proto->mutable_manusya_id()->set_ip(manusya_descriptor.ip);
1!
219
        manusya_descriptor_proto->mutable_manusya_id()->set_port(manusya_descriptor.port);
1!
220
        manusya_descriptor_proto->mutable_manusya_id()->mutable_uuid()->set_high(manusya_descriptor.uuid.high());
1!
221
        manusya_descriptor_proto->mutable_manusya_id()->mutable_uuid()->set_low(manusya_descriptor.uuid.low());
1!
222
        manusya_descriptor_proto->mutable_storage_info()->set_cluster_id(manusya_descriptor.cluster_id);
1!
223
        manusya_descriptor_proto->set_is_alive(manusya_descriptor.is_alive);
1!
224
        manusya_descriptor_proto->set_last_heartbeat_time(manusya_descriptor.last_heartbeat_time);
1!
225
    }
226
    return Status::OK();
4✔
227
}
6✔
228

229
Status Deva::set_applied_index(int64_t index) {
74✔
230
    if (index <= _applied_index) {
74!
231
        PLOG_WARN(("desc", "index is applied already")("index", index));
×
232
        return Status::OK();
×
233
    }
234
    auto in_txn = common::TxnManager::instance().in_txn();
74!
235
    auto this_txn = _store->begin_txn();
74!
236
    auto txn = in_txn ? common::TxnManager::instance().get_txn_store() : this_txn.get();
74!
237
    if (txn == nullptr) {
74!
238
        return Status(EIO, "Failed to begin transaction");
×
239
    }
240
    auto status = txn->hset(_meta_key, _applied_index_key, std::to_string(index));
74!
241
    return status;
74!
242
    if (in_txn) {
243
        return Status::OK();
244
    }
245

246
    status = txn->commit();
247
    return status;
248
}
74✔
249

250
Status Deva::update_file_info(const UUID& id, const proto::FileInfo& file_info) {
74✔
251
    auto in_txn = common::TxnManager::instance().in_txn();
74!
252
    auto this_txn = _store->begin_txn();
74!
253
    auto txn = in_txn ? common::TxnManager::instance().get_txn_store() : this_txn.get();
74!
254
    if (txn == nullptr) {
74!
NEW
255
        return Status(EIO, "Failed to begin transaction");
×
256
    }
257
    auto status = txn->hset(_file_info_key, id.str(), file_info.SerializeAsString());
74!
258
    if (!status.ok()) {
74!
NEW
259
        return status;
×
260
    }
261
    if (in_txn) {
74!
262
        return Status::OK();
74✔
263
    }
NEW
264
    status = txn->commit();
×
NEW
265
    if (!status.ok()) {
×
NEW
266
        return status;
×
267
    }
NEW
268
    return status;
×
269
}
74✔
270

271
Status Deva::get_file_info(const UUID& id, proto::FileInfo* file_info) {
1✔
272
    std::string file_info_str;
1✔
273
    auto status = _store->hget(_file_info_key, id.str(), &file_info_str);
1!
274
    if (!status.ok()) {
1!
NEW
275
        return status;
×
276
    }
277
    auto ret = file_info->ParseFromString(file_info_str);
1!
278
    if (!ret) {
1!
NEW
279
        return Status(EIO, "Failed to parse file info");
×
280
    }
281
    return Status::OK();
1✔
282
}
1✔
283

NEW
284
Status Deva::remove_file_info(const UUID& id) {
×
NEW
285
    auto status = _store->hdel(_file_info_key, id.str());
×
NEW
286
    if (!status.ok()) {
×
NEW
287
        return status;
×
288
    }
NEW
289
    return Status::OK();
×
NEW
290
}
×
291

292
Status Deva::save_snapshot(std::string_view path, std::vector<std::string>* files) {
3✔
293
    return _store->check_point(path.data(), files);
3✔
294
}
295

296
Status Deva::load_snapshot(std::string_view path) {
3✔
297
    auto status = _store->recover(path.data());
3!
298
    if (!status.ok()) {
3!
299
        return status;
×
300
    }
301
    // get applied index
302
    std::string applied_index_str;
3✔
303
    status = _store->hget(_meta_key, _applied_index_key, &applied_index_str);
3!
304
    if (!status.ok()) {
3!
305
        return status;
×
306
    }
307
    _applied_index = std::stoll(applied_index_str);
3!
308
    return Status::OK();
3✔
309
}
3✔
310

311
} // namespace pain::deva
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc