• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 12600607047

03 Jan 2025 04:18PM UTC coverage: 82.044%. First build
12600607047

Pull #14480

github

web-flow
Merge 2145c1a2c into d5a56c20b
Pull Request #14480: Refactor session ↔︎ leader interaction for durable shared subscriptions

112 of 151 new or added lines in 7 files covered. (74.17%)

56558 of 68936 relevant lines covered (82.04%)

15280.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v3.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4

5
-module(emqx_ds_shared_sub_proto_v3).
6

7
-behaviour(emqx_bpapi).
8

9
-include_lib("emqx/include/bpapi.hrl").
10

11
-export([
12
    introduced_in/0,
13

14
    borrower_connect/4,
15
    borrower_ping/3,
16
    borrower_disconnect/4,
17
    borrower_update_progress/4,
18
    borrower_revoke_finished/4,
19

20
    leader_connect_response/3,
21
    leader_ping_response/3,
22
    leader_grant/4,
23
    leader_revoke/4,
24
    leader_revoked/4,
25
    leader_invalidate/3
26
]).
27

28
introduced_in() ->
29
    "5.9.0".
×
30

31
-type share_topic_filter() :: emqx_ds_shared_sub_proto:share_topic_filter().
32
-type stream() :: emqx_ds_shared_sub_proto:stream().
33
-type borrower_id() :: emqx_ds_shared_sub_proto:borrower_id().
34
-type leader() :: emqx_ds_shared_sub_proto:leader().
35
-type leader_stream_progress() :: emqx_ds_shared_sub_proto:leader_stream_progress().
36
-type agent_stream_progress() :: emqx_ds_shared_sub_proto:agent_stream_progress().
37
-type agent_stream_progresses() :: emqx_ds_shared_sub_proto:agent_stream_progresses().
38

39
%%--------------------------------------------------------------------
40
%% Borrower -> Leader messages
41
%%--------------------------------------------------------------------
42

43
-spec borrower_connect(node(), leader(), borrower_id(), share_topic_filter()) -> ok.
44
borrower_connect(Node, ToLeader, FromBorrowerId, ShareTopicFilter) ->
NEW
45
    erpc:cast(Node, emqx_ds_shared_sub_proto, borrower_connect_v3, [
×
46
        ToLeader, FromBorrowerId, ShareTopicFilter
47
    ]).
48

49
-spec borrower_ping(node(), leader(), borrower_id()) -> ok.
50
borrower_ping(Node, ToLeader, FromBorrowerId) ->
NEW
51
    erpc:cast(Node, emqx_ds_shared_sub_proto, borrower_ping_v3, [ToLeader, FromBorrowerId]).
×
52

53
-spec borrower_disconnect(node(), leader(), borrower_id(), agent_stream_progresses()) -> ok.
54
borrower_disconnect(Node, ToLeader, FromBorrowerId, StreamProgresses) ->
NEW
55
    erpc:cast(Node, emqx_ds_shared_sub_proto, borrower_disconnect_v3, [
×
56
        ToLeader, FromBorrowerId, StreamProgresses
57
    ]).
58

59
-spec borrower_update_progress(node(), leader(), borrower_id(), agent_stream_progress()) ->
60
    ok.
61
borrower_update_progress(Node, ToLeader, FromBorrowerId, StreamProgress) ->
NEW
62
    erpc:cast(Node, emqx_ds_shared_sub_proto, borrower_update_progress_v3, [
×
63
        ToLeader, FromBorrowerId, StreamProgress
64
    ]).
65

66
-spec borrower_revoke_finished(node(), leader(), borrower_id(), stream()) -> ok.
67
borrower_revoke_finished(Node, ToLeader, FromBorrowerId, Stream) ->
NEW
68
    erpc:cast(Node, emqx_ds_shared_sub_proto, borrower_revoke_finished_v3, [
×
69
        ToLeader, FromBorrowerId, Stream
70
    ]).
71

72
%%--------------------------------------------------------------------
73
%% Leader -> Borrower messages
74
%%--------------------------------------------------------------------
75

76
-spec leader_connect_response(node(), borrower_id(), leader()) -> ok.
77
leader_connect_response(Node, ToBorrowerId, FromLeader) ->
78
    erpc:cast(Node, emqx_ds_shared_sub_proto, leader_connect_response_v3, [
×
79
        ToBorrowerId, FromLeader
80
    ]).
81

82
-spec leader_ping_response(node(), borrower_id(), leader()) -> ok.
83
leader_ping_response(Node, ToBorrowerId, FromLeader) ->
NEW
84
    erpc:cast(Node, emqx_ds_shared_sub_proto, leader_ping_response_v3, [ToBorrowerId, FromLeader]).
×
85

86
-spec leader_grant(node(), borrower_id(), leader(), leader_stream_progress()) -> ok.
87
leader_grant(Node, ToBorrowerId, FromLeader, StreamProgress) ->
88
    erpc:cast(Node, emqx_ds_shared_sub_proto, leader_grant_v3, [
×
89
        ToBorrowerId, FromLeader, StreamProgress
90
    ]).
91

92
-spec leader_revoke(node(), borrower_id(), leader(), stream()) -> ok.
93
leader_revoke(Node, ToBorrowerId, FromLeader, Stream) ->
94
    erpc:cast(Node, emqx_ds_shared_sub_proto, leader_revoke_v3, [
×
95
        ToBorrowerId, FromLeader, Stream
96
    ]).
97

98
-spec leader_revoked(node(), borrower_id(), leader(), stream()) -> ok.
99
leader_revoked(Node, ToBorrowerId, FromLeader, Stream) ->
100
    erpc:cast(Node, emqx_ds_shared_sub_proto, leader_revoked_v3, [
×
101
        ToBorrowerId, FromLeader, Stream
102
    ]).
103

104
-spec leader_invalidate(node(), borrower_id(), leader()) -> ok.
105
leader_invalidate(Node, ToBorrowerId, FromLeader) ->
NEW
106
    erpc:cast(Node, emqx_ds_shared_sub_proto, leader_invalidate_v3, [ToBorrowerId, FromLeader]).
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc