• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

piotrmurach / finite_machine / #419

07 Oct 2023 04:44PM UTC coverage: 73.22% (-25.1%) from 98.297%
#419

push

piotrmurach
Fix any_event and any_state methods in the define method and definition class

Closes #76

8 of 8 new or added lines in 1 file covered. (100.0%)

648 of 885 relevant lines covered (73.22%)

351.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.99
/lib/finite_machine/message_queue.rb
1
# frozen_string_literal: true
2

3
require_relative "listener"
1✔
4
require "thread"
1✔
5

6
module FiniteMachine
1✔
7
  # Responsible for storage of asynchronous messages such as events
8
  # and callbacks.
9
  #
10
  # Used internally by {Observer}
11
  #
12
  # @api private
13
  class MessageQueue
1✔
14
    # Initialize a MessageQueue
15
    #
16
    # @example
17
    #   message_queue = FiniteMachine::MessageQueue.new
18
    #
19
    # @api public
20
    def initialize
1✔
21
      @not_empty = ConditionVariable.new
×
22
      @mutex     = Mutex.new
×
23
      @queue     = Queue.new
×
24
      @dead      = false
×
25
      @listeners = []
×
26
      @thread    = nil
×
27
    end
28

29
    # Start a new thread with a queue of callback events to run
30
    #
31
    # @example
32
    #   message_queue.start
33
    #
34
    # @return [Thread, nil]
35
    #
36
    # @api private
37
    def start
1✔
38
      return if running?
×
39

40
      @mutex.synchronize { spawn_thread }
×
41
    end
42

43
    # Spawn a new background thread
44
    #
45
    # @return [Thread]
46
    #
47
    # @api private
48
    def spawn_thread
1✔
49
      @thread = Thread.new do
×
50
        Thread.current.abort_on_exception = true
×
51
        process_events
×
52
      end
53
    end
54

55
    # Check whether or not the message queue is running
56
    #
57
    # @example
58
    #   message_queue.running?
59
    #
60
    # @return [Boolean]
61
    #
62
    # @api public
63
    def running?
1✔
64
      !@thread.nil? && alive?
×
65
    end
66

67
    # Add an asynchronous event to the message queue to process
68
    #
69
    # @example
70
    #   message_queue << AsyncCall.build(...)
71
    #
72
    # @param [FiniteMachine::AsyncCall] event
73
    #   the event to add
74
    #
75
    # @return [void]
76
    #
77
    # @api public
78
    def <<(event)
1✔
79
      @mutex.synchronize do
×
80
        if @dead
×
81
          discard_message(event)
×
82
        else
83
          @queue << event
×
84
          @not_empty.signal
×
85
        end
86
      end
87
    end
88

89
    # Add a listener for the message queue to receive notifications
90
    #
91
    # @example
92
    #   message_queue.subscribe { |event| ... }
93
    #
94
    # @return [void]
95
    #
96
    # @api public
97
    def subscribe(*args, &block)
1✔
98
      @mutex.synchronize do
×
99
        listener = Listener.new(*args)
×
100
        listener.on_delivery(&block)
×
101
        @listeners << listener
×
102
      end
103
    end
104

105
    # Check whether or not there are any messages to handle
106
    #
107
    # @example
108
    #   message_queue.empty?
109
    #
110
    # @api public
111
    def empty?
1✔
112
      @mutex.synchronize { @queue.empty? }
×
113
    end
114

115
    # Check whether or not the message queue is alive
116
    #
117
    # @example
118
    #   message_queue.alive?
119
    #
120
    # @return [Boolean]
121
    #
122
    # @api public
123
    def alive?
1✔
124
      @mutex.synchronize { !@dead }
×
125
    end
126

127
    # Join the message queue from the current thread
128
    #
129
    # @param [Fixnum] timeout
130
    #   the time limit
131
    #
132
    # @example
133
    #   message_queue.join
134
    #
135
    # @return [Thread, nil]
136
    #
137
    # @api public
138
    def join(timeout = nil)
1✔
139
      return unless @thread
×
140

141
      timeout.nil? ? @thread.join : @thread.join(timeout)
×
142
    end
143

144
    # Shut down this message queue and clean it up
145
    #
146
    # @example
147
    #   message_queue.shutdown
148
    #
149
    # @raise [FiniteMachine::MessageQueueDeadError]
150
    #
151
    # @return [Boolean]
152
    #
153
    # @api public
154
    def shutdown
1✔
155
      raise MessageQueueDeadError, "message queue already dead" if @dead
×
156

157
      queue = []
×
158
      @mutex.synchronize do
×
159
        @dead = true
×
160
        @not_empty.broadcast
×
161

162
        queue = @queue
×
163
        @queue.clear
×
164
      end
165
      while !queue.empty?
×
166
        discard_message(queue.pop)
×
167
      end
168
      true
×
169
    end
170

171
    # The number of messages waiting for processing
172
    #
173
    # @example
174
    #   message_queue.size
175
    #
176
    # @return [Integer]
177
    #
178
    # @api public
179
    def size
1✔
180
      @mutex.synchronize { @queue.size }
×
181
    end
182

183
    # Inspect this message queue
184
    #
185
    # @example
186
    #   message_queue.inspect
187
    #
188
    # @return [String]
189
    #
190
    # @api public
191
    def inspect
1✔
192
      @mutex.synchronize do
×
193
        "#<#{self.class}:#{object_id.to_s(16)} @size=#{size}, @dead=#{@dead}>"
×
194
      end
195
    end
196

197
    private
1✔
198

199
    # Notify listeners about the event
200
    #
201
    # @param [FiniteMachine::AsyncCall] event
202
    #   the event to notify listeners about
203
    #
204
    # @return [void]
205
    #
206
    # @api private
207
    def notify_listeners(event)
1✔
208
      @listeners.each { |listener| listener.handle_delivery(event) }
×
209
    end
210

211
    # Process all the events
212
    #
213
    # @return [Thread]
214
    #
215
    # @api private
216
    def process_events
1✔
217
      until @dead
×
218
        @mutex.synchronize do
×
219
          while @queue.empty?
×
220
            @not_empty.wait(@mutex)
×
221
          end
222
          event = @queue.pop
×
223
          break unless event
×
224

225
          notify_listeners(event)
×
226
          event.dispatch
×
227
        end
228
      end
229
    rescue Exception => ex
230
      Logger.error "Error while running event: #{Logger.format_error(ex)}"
×
231
    end
232

233
    # Log discarded message
234
    #
235
    # @param [FiniteMachine::AsyncCall] message
236
    #   the message to discard
237
    #
238
    # @return [void]
239
    #
240
    # @api private
241
    def discard_message(message)
1✔
242
      Logger.debug "Discarded message: #{message}" if $DEBUG
×
243
    end
244
  end # EventQueue
245
end # FiniteMachine
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc