• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

piotrmurach / finite_machine / #415

11 Sep 2023 08:46PM UTC coverage: 73.326% (-25.0%) from 98.297%
#415

push

piotrmurach
Fix message queue shutdown to raise valid error

1 of 1 new or added line in 1 file covered. (100.0%)

646 of 881 relevant lines covered (73.33%)

250.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.99
/lib/finite_machine/message_queue.rb
1
# frozen_string_literal: true
2

3
require_relative "listener"
1✔
4
require "thread"
1✔
5

6
module FiniteMachine
1✔
7
  # Allows for storage of asynchronous messages such as events
8
  # and callbacks.
9
  #
10
  # Used internally by {Observer} and {StateMachine}
11
  #
12
  # @api private
13
  class MessageQueue
1✔
14
    # Initialize an event queue in separate thread
15
    #
16
    # @example
17
    #   MessageQueue.new
18
    #
19
    # @api public
20
    def initialize
1✔
21
      @not_empty = ConditionVariable.new
×
22
      @mutex     = Mutex.new
×
23
      @queue     = Queue.new
×
24
      @dead      = false
×
25
      @listeners = []
×
26
      @thread    = nil
×
27
    end
28

29
    # Start a new thread with a queue of callback events to run
30
    #
31
    # @api private
32
    def start
1✔
33
      return if running?
×
34

35
      @mutex.synchronize { spawn_thread }
×
36
    end
37

38
    # Spawn new background thread
39
    #
40
    # @api private
41
    def spawn_thread
1✔
42
      @thread = Thread.new do
×
43
        Thread.current.abort_on_exception = true
×
44
        process_events
×
45
      end
46
    end
47

48
    def running?
1✔
49
      !@thread.nil? && alive?
×
50
    end
51

52
    # Add asynchronous event to the event queue to process
53
    #
54
    # @example
55
    #   event_queue << AsyncCall.build(...)
56
    #
57
    # @param [AsyncCall] event
58
    #
59
    # @return [nil]
60
    #
61
    # @api public
62
    def <<(event)
1✔
63
      @mutex.synchronize do
×
64
        if @dead
×
65
          discard_message(event)
×
66
        else
67
          @queue << event
×
68
          @not_empty.signal
×
69
        end
70
      end
71
    end
72

73
    # Add listener to the queue to receive messages
74
    #
75
    # @api public
76
    def subscribe(*args, &block)
1✔
77
      @mutex.synchronize do
×
78
        listener = Listener.new(*args)
×
79
        listener.on_delivery(&block)
×
80
        @listeners << listener
×
81
      end
82
    end
83

84
    # Check if there are any events to handle
85
    #
86
    # @example
87
    #   event_queue.empty?
88
    #
89
    # @api public
90
    def empty?
1✔
91
      @mutex.synchronize { @queue.empty? }
×
92
    end
93

94
    # Check if the event queue is alive
95
    #
96
    # @example
97
    #   event_queue.alive?
98
    #
99
    # @return [Boolean]
100
    #
101
    # @api public
102
    def alive?
1✔
103
      @mutex.synchronize { !@dead }
×
104
    end
105

106
    # Join the event queue from current thread
107
    #
108
    # @param [Fixnum] timeout
109
    #
110
    # @example
111
    #   event_queue.join
112
    #
113
    # @return [nil, Thread]
114
    #
115
    # @api public
116
    def join(timeout = nil)
1✔
117
      return unless @thread
×
118

119
      timeout.nil? ? @thread.join : @thread.join(timeout)
×
120
    end
121

122
    # Shut down this message queue and clean it up
123
    #
124
    # @example
125
    #   message_queue.shutdown
126
    #
127
    # @raise [FiniteMachine::MessageQueueDeadError]
128
    #
129
    # @return [Boolean]
130
    #
131
    # @api public
132
    def shutdown
1✔
133
      raise MessageQueueDeadError, "message queue already dead" if @dead
×
134

135
      queue = []
×
136
      @mutex.synchronize do
×
137
        @dead = true
×
138
        @not_empty.broadcast
×
139

140
        queue = @queue
×
141
        @queue.clear
×
142
      end
143
      while !queue.empty?
×
144
        discard_message(queue.pop)
×
145
      end
146
      true
×
147
    end
148

149
    # Get number of events waiting for processing
150
    #
151
    # @example
152
    #   event_queue.size
153
    #
154
    # @return [Integer]
155
    #
156
    # @api public
157
    def size
1✔
158
      @mutex.synchronize { @queue.size }
×
159
    end
160

161
    def inspect
1✔
162
      @mutex.synchronize do
×
163
        "#<#{self.class}:#{object_id.to_s(16)} @size=#{size}, @dead=#{@dead}>"
×
164
      end
165
    end
166

167
    private
1✔
168

169
    # Notify consumers about process event
170
    #
171
    # @param [AsyncCall] event
172
    #
173
    # @api private
174
    def notify_listeners(event)
1✔
175
      @listeners.each { |listener| listener.handle_delivery(event) }
×
176
    end
177

178
    # Process all the events
179
    #
180
    # @return [Thread]
181
    #
182
    # @api private
183
    def process_events
1✔
184
      until @dead
×
185
        @mutex.synchronize do
×
186
          while @queue.empty?
×
187
            @not_empty.wait(@mutex)
×
188
          end
189
          event = @queue.pop
×
190
          break unless event
×
191

192
          notify_listeners(event)
×
193
          event.dispatch
×
194
        end
195
      end
196
    rescue Exception => ex
197
      Logger.error "Error while running event: #{Logger.format_error(ex)}"
×
198
    end
199

200
    def discard_message(message)
1✔
201
      Logger.debug "Discarded message: #{message}" if $DEBUG
×
202
    end
203
  end # EventQueue
204
end # FiniteMachine
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc