• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kore-ledger / rush-rs / 14218476216

02 Apr 2025 11:27AM UTC coverage: 80.193% (+0.2%) from 80.041%
14218476216

push

github

AKALugo
feat: better runner, syncs stops and sync childs stops

90 of 109 new or added lines in 6 files covered. (82.57%)

3 existing lines in 3 files now uncovered.

1992 of 2484 relevant lines covered (80.19%)

5.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.46
/actor/src/handler.rs
1
// Copyright 2025 Kore Ledger, SL
2
// SPDX-License-Identifier: Apache-2.0
3

4
use crate::{
5
    ActorPath, Error,
6
    actor::{Actor, ActorContext, Handler},
7
};
8

9
use async_trait::async_trait;
10

11
use tokio::sync::{mpsc, oneshot};
12

13
use tracing::{debug, error};
14

15
use std::marker::PhantomData;
16

17
/// Message handler trait for actors messages.
18
#[async_trait]
19
pub trait MessageHandler<A: Actor>: Send + Sync {
20
    /// Handles the message.
21
    async fn handle(&mut self, actor: &mut A, ctx: &mut ActorContext<A>);
22
}
23

24
/// Internal actor message.
25
struct ActorMessage<A>
26
where
27
    A: Actor + Handler<A>,
28
{
29
    message: A::Message,
30
    sender: ActorPath,
31
    rsvp: Option<oneshot::Sender<Result<A::Response, Error>>>,
32
    _phantom_actor: PhantomData<A>,
33
}
34

35
/// Internal actor message implementation.
36
impl<A> ActorMessage<A>
37
where
38
    A: Actor + Handler<A>,
39
{
40
    /// Creates internal actor message from message and optional reponse sender.
41
    pub fn new(
47✔
42
        message: A::Message,
47✔
43
        sender: ActorPath,
47✔
44
        rsvp: Option<oneshot::Sender<Result<A::Response, Error>>>,
47✔
45
    ) -> Self {
47✔
46
        debug!("Creating new internal actor message.");
47✔
47
        Self {
47✔
48
            message,
47✔
49
            sender,
47✔
50
            rsvp,
47✔
51
            _phantom_actor: PhantomData,
47✔
52
        }
47✔
53
    }
47✔
54
}
55

56
/// Message handler implementation for internal actor message.
57
#[async_trait]
58
impl<A> MessageHandler<A> for ActorMessage<A>
59
where
60
    A: Actor + Handler<A>,
61
{
62
    async fn handle(&mut self, actor: &mut A, ctx: &mut ActorContext<A>) {
45✔
63
        debug!("Handling internal message.");
45✔
64
        
65
            debug!("Handling message.");
45✔
66
            let result = actor
45✔
67
                .handle_message(self.sender.clone(), self.message.clone(), ctx)
45✔
68
                .await;
45✔
69

70
            if let Some(rsvp) = self.rsvp.take() {
45✔
71
                debug!("Sending back response (if any).");
24✔
72
                rsvp.send(result).unwrap_or_else(|_failed| {
24✔
73
                    error!("Failed to send back response!"); // GRCOV-LINE
×
74
                }) // GRCOV-LINE
24✔
75
            }
21✔
76
    }
90✔
77
}
78

79
/// Boxed message handler.
80
pub type BoxedMessageHandler<A> = Box<dyn MessageHandler<A>>;
81

82
/// Mailbo receiver.
83
pub type MailboxReceiver<A> = mpsc::UnboundedReceiver<BoxedMessageHandler<A>>;
84

85
/// Mailbox sender.
86
pub type MailboxSender<A> = mpsc::UnboundedSender<BoxedMessageHandler<A>>;
87

88
/// Mailbox.
89
pub type Mailbox<A> = (MailboxSender<A>, MailboxReceiver<A>);
90

91
/// Mailbox factory.
92
pub fn mailbox<A>() -> Mailbox<A> {
16✔
93
    mpsc::unbounded_channel()
16✔
94
}
16✔
95

96
/// Handle reference to send messages
97
pub struct HandleHelper<A> {
98
    sender: MailboxSender<A>,
99
}
100

101
impl<A> HandleHelper<A>
102
where
103
    A: Actor + Handler<A>,
104
{
105
    /// Creates a new handle reference.
106
    pub(crate) fn new(sender: MailboxSender<A>) -> Self {
15✔
107
        debug!("Creating new handle reference.");
15✔
108
        Self { sender }
15✔
109
    }
15✔
110

111
    /// Tell messasge to the actor.
112
    pub(crate) async fn tell(
23✔
113
        &self,
23✔
114
        sender: ActorPath,
23✔
115
        message: A::Message,
23✔
116
    ) -> Result<(), Error> {
23✔
117
        debug!("Telling message to actor from handle reference.");
23✔
118
        let msg = ActorMessage::new(message, sender, None);
23✔
119
        if let Err(error) = self.sender.send(Box::new(msg)) {
23✔
120
            debug!("Failed to tell message! {}", error.to_string()); // GRCOV-START
1✔
121
            Err(Error::Send(error.to_string()))
1✔
122
        } else {
123
            // GRCOV-END
124
            debug!("Message sent successfully.");
22✔
125
            Ok(())
22✔
126
        }
127
    }
23✔
128

129
    /// Ask message to the actor.
130
    pub(crate) async fn ask(
24✔
131
        &self,
24✔
132
        sender: ActorPath,
24✔
133
        message: A::Message,
24✔
134
    ) -> Result<A::Response, Error> {
24✔
135
        debug!("Asking message to actor from handle reference.");
24✔
136
        let (response_sender, response_receiver) = oneshot::channel();
24✔
137
        let msg =
24✔
138
            ActorMessage::new(message, sender, Some(response_sender));
24✔
139
        if let Err(error) = self.sender.send(Box::new(msg)) {
24✔
140
            error!("Failed to ask message! {}", error.to_string()); // GRCOV-START
×
141
            Err(Error::Send(error.to_string()))
×
142
        } else {
143
            // GRCOV-END
144
            response_receiver
24✔
145
                .await
24✔
146
                .map_err(|error| Error::Send(error.to_string()))? // GRCOV-LINE
24✔
147
        }
148
    }
24✔
149

150
    /// Closes the sender.
UNCOV
151
    pub async fn close(&self) {
×
152
        self.sender.closed().await;
×
153
    }
×
154

155
    /// True if the sender is closed.
156
    pub fn is_closed(&self) -> bool {
×
157
        self.sender.is_closed()
×
158
    }
×
159
}
160

161
impl<A> Clone for HandleHelper<A> {
162
    fn clone(&self) -> Self {
30✔
163
        Self {
30✔
164
            sender: self.sender.clone(),
30✔
165
        }
30✔
166
    }
30✔
167
}
168

169
#[cfg(test)]
170
mod tests {
171

172
    use super::*;
173

174
    #[test]
175
    fn test_mailbox() {
1✔
176
        let (sender, receiver) = mailbox::<()>();
1✔
177
        assert_eq!(sender.is_closed(), false);
1✔
178
        assert_eq!(receiver.is_closed(), false);
1✔
179
    }
1✔
180
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc