• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

piotrmurach / finite_machine / #411

29 Aug 2023 09:37PM UTC coverage: 73.326% (-25.0%) from 98.297%
#411

push

piotrmurach
Add Ruby 3.1, 3.2, JRuby 9.3 and 9.4 to GitHub CI

646 of 881 relevant lines covered (73.33%)

256.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.99
/lib/finite_machine/message_queue.rb
1
# frozen_string_literal: true
2

3
require_relative "listener"
1✔
4
require "thread"
1✔
5

6
module FiniteMachine
1✔
7
  # Allows for storage of asynchronous messages such as events
8
  # and callbacks.
9
  #
10
  # Used internally by {Observer} and {StateMachine}
11
  #
12
  # @api private
13
  class MessageQueue
1✔
14
    # Initialize an event queue in separate thread
15
    #
16
    # @example
17
    #   MessageQueue.new
18
    #
19
    # @api public
20
    def initialize
1✔
21
      @not_empty = ConditionVariable.new
×
22
      @mutex     = Mutex.new
×
23
      @queue     = Queue.new
×
24
      @dead      = false
×
25
      @listeners = []
×
26
      @thread    = nil
×
27
    end
28

29
    # Start a new thread with a queue of callback events to run
30
    #
31
    # @api private
32
    def start
1✔
33
      return if running?
×
34

35
      @mutex.synchronize { spawn_thread }
×
36
    end
37

38
    # Spawn new background thread
39
    #
40
    # @api private
41
    def spawn_thread
1✔
42
      @thread = Thread.new do
×
43
        Thread.current.abort_on_exception = true
×
44
        process_events
×
45
      end
46
    end
47

48
    def running?
1✔
49
      !@thread.nil? && alive?
×
50
    end
51

52
    # Add asynchronous event to the event queue to process
53
    #
54
    # @example
55
    #   event_queue << AsyncCall.build(...)
56
    #
57
    # @param [AsyncCall] event
58
    #
59
    # @return [nil]
60
    #
61
    # @api public
62
    def <<(event)
1✔
63
      @mutex.synchronize do
×
64
        if @dead
×
65
          discard_message(event)
×
66
        else
67
          @queue << event
×
68
          @not_empty.signal
×
69
        end
70
      end
71
    end
72

73
    # Add listener to the queue to receive messages
74
    #
75
    # @api public
76
    def subscribe(*args, &block)
1✔
77
      @mutex.synchronize do
×
78
        listener = Listener.new(*args)
×
79
        listener.on_delivery(&block)
×
80
        @listeners << listener
×
81
      end
82
    end
83

84
    # Check if there are any events to handle
85
    #
86
    # @example
87
    #   event_queue.empty?
88
    #
89
    # @api public
90
    def empty?
1✔
91
      @mutex.synchronize { @queue.empty? }
×
92
    end
93

94
    # Check if the event queue is alive
95
    #
96
    # @example
97
    #   event_queue.alive?
98
    #
99
    # @return [Boolean]
100
    #
101
    # @api public
102
    def alive?
1✔
103
      @mutex.synchronize { !@dead }
×
104
    end
105

106
    # Join the event queue from current thread
107
    #
108
    # @param [Fixnum] timeout
109
    #
110
    # @example
111
    #   event_queue.join
112
    #
113
    # @return [nil, Thread]
114
    #
115
    # @api public
116
    def join(timeout = nil)
1✔
117
      return unless @thread
×
118

119
      timeout.nil? ? @thread.join : @thread.join(timeout)
×
120
    end
121

122
    # Shut down this event queue and clean it up
123
    #
124
    # @example
125
    #   event_queue.shutdown
126
    #
127
    # @return [Boolean]
128
    #
129
    # @api public
130
    def shutdown
1✔
131
      raise EventQueueDeadError, "event queue already dead" if @dead
×
132

133
      queue = []
×
134
      @mutex.synchronize do
×
135
        @dead = true
×
136
        @not_empty.broadcast
×
137

138
        queue = @queue
×
139
        @queue.clear
×
140
      end
141
      while !queue.empty?
×
142
        discard_message(queue.pop)
×
143
      end
144
      true
×
145
    end
146

147
    # Get number of events waiting for processing
148
    #
149
    # @example
150
    #   event_queue.size
151
    #
152
    # @return [Integer]
153
    #
154
    # @api public
155
    def size
1✔
156
      @mutex.synchronize { @queue.size }
×
157
    end
158

159
    def inspect
1✔
160
      @mutex.synchronize do
×
161
        "#<#{self.class}:#{object_id.to_s(16)} @size=#{size}, @dead=#{@dead}>"
×
162
      end
163
    end
164

165
    private
1✔
166

167
    # Notify consumers about process event
168
    #
169
    # @param [AsyncCall] event
170
    #
171
    # @api private
172
    def notify_listeners(event)
1✔
173
      @listeners.each { |listener| listener.handle_delivery(event) }
×
174
    end
175

176
    # Process all the events
177
    #
178
    # @return [Thread]
179
    #
180
    # @api private
181
    def process_events
1✔
182
      until @dead
×
183
        @mutex.synchronize do
×
184
          while @queue.empty?
×
185
            @not_empty.wait(@mutex)
×
186
          end
187
          event = @queue.pop
×
188
          break unless event
×
189

190
          notify_listeners(event)
×
191
          event.dispatch
×
192
        end
193
      end
194
    rescue Exception => ex
195
      Logger.error "Error while running event: #{Logger.format_error(ex)}"
×
196
    end
197

198
    def discard_message(message)
1✔
199
      Logger.debug "Discarded message: #{message}" if $DEBUG
×
200
    end
201
  end # EventQueue
202
end # FiniteMachine
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc