• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fast-programmer / outboxer / 16829818360

08 Aug 2025 11:57AM UTC coverage: 97.858% (-0.5%) from 98.381%
16829818360

Pull #330

github

web-flow
Merge 5eb710504 into f1d9dd804
Pull Request #330: Delete unused methods

5 of 5 new or added lines in 2 files covered. (100.0%)

10 existing lines in 1 file now uncovered.

2056 of 2101 relevant lines covered (97.86%)

166.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.56
/lib/outboxer/publisher.rb
1
require "optparse"
1✔
2

3
module Outboxer
1✔
4
  # Manages the publishing of messages.
5
  module Publisher
1✔
6
    module_function
1✔
7

8
    def terminating?
1✔
9
      @status == Status::TERMINATING
107,697✔
10
    end
11

12
    # Parses command line arguments to configure the publisher.
13
    # @param args [Array<String>] The arguments passed via the command line.
14
    # @return [Hash] The parsed options including configuration path, environment,
15
    # buffer size, batch_size, and intervals.
16
    def self.parse_cli_options(args)
1✔
17
      options = {}
14✔
18

19
      parser = ::OptionParser.new do |opts|
14✔
20
        opts.banner = "Usage: outboxer_publisher [options]"
14✔
21

22
        opts.on("--environment ENV", "Application environment") do |v|
14✔
23
          options[:environment] = v
1✔
24
        end
25

26
        opts.on("--batch-size SIZE", Integer, "Batch size") do |v|
14✔
27
          options[:batch_size] = v
1✔
28
        end
29

30
        opts.on("--concurrency N", Integer, "Number of threads to publish messages") do |v|
14✔
31
          options[:concurrency] = v
1✔
32
        end
33

34
        opts.on("--tick-interval SECS", Float, "Tick interval in seconds") do |v|
14✔
35
          options[:tick_interval] = v
1✔
36
        end
37

38
        opts.on("--poll-interval SECS", Float, "Poll interval in seconds") do |v|
14✔
39
          options[:poll_interval] = v
1✔
40
        end
41

42
        opts.on("--heartbeat-interval SECS", Float, "Heartbeat interval in seconds") do |v|
14✔
43
          options[:heartbeat_interval] = v
1✔
44
        end
45

46
        opts.on("--sweep-interval SECS", Float, "Sweep interval in seconds") do |v|
14✔
47
          options[:sweep_interval] = v
1✔
48
        end
49

50
        opts.on("--sweep-retention SECS", Float, "Sweep retention in seconds") do |v|
14✔
51
          options[:sweep_retention] = v
1✔
52
        end
53

54
        opts.on("--sweep-batch-size SIZE", Integer, "Sweep batch size") do |v|
14✔
55
          options[:sweep_batch_size] = v
1✔
56
        end
57

58
        opts.on("--log-level LEVEL", Integer, "Log level") do |v|
14✔
59
          options[:log_level] = v
1✔
60
        end
61

62
        opts.on("--config PATH", "Path to YAML config file") do |v|
14✔
63
          options[:config] = v
1✔
64
        end
65

66
        opts.on("--version", "Print version and exit") do
14✔
67
          puts "Outboxer version #{Outboxer::VERSION}"
1✔
68
          exit
1✔
69
        end
70

71
        opts.on("--help", "Show this help message") do
14✔
72
          puts opts
1✔
73
          exit
1✔
74
        end
75
      end
76

77
      parser.parse!(args)
14✔
78

79
      options
12✔
80
    end
81

82
    CONFIG_DEFAULTS = {
83
      path: "config/outboxer.yml",
1✔
84
      enviroment: "development"
85
    }
86

87
    # Loads and processes the YAML configuration for the publisher.
88
    # @param environment [String] The application environment.
89
    # @param path [String] The path to the configuration file.
90
    # @return [Hash] The processed configuration data with environment-specific overrides.
91
    def config(
1✔
92
      environment: CONFIG_DEFAULTS[:environment],
93
      path: CONFIG_DEFAULTS[:path]
94
    )
UNCOV
95
      path_expanded = ::File.expand_path(path)
×
UNCOV
96
      text = File.read(path_expanded)
×
UNCOV
97
      erb = ERB.new(text, trim_mode: "-")
×
UNCOV
98
      erb.filename = path_expanded
×
UNCOV
99
      erb_result = erb.result
×
100

UNCOV
101
      yaml = YAML.safe_load(erb_result, permitted_classes: [Symbol], aliases: true)
×
UNCOV
102
      yaml.deep_symbolize_keys!
×
UNCOV
103
      yaml_override = yaml.fetch(environment&.to_sym, {}).slice(*PUBLISH_MESSAGES_DEFAULTS.keys)
×
UNCOV
104
      yaml.slice(*PUBLISH_MESSAGES_DEFAULTS.keys).merge(yaml_override)
×
105
    rescue Errno::ENOENT
UNCOV
106
      {}
×
107
    end
108

109
    # Retrieves publisher data by ID including associated signals.
110
    # @param id [Integer] The ID of the publisher to find.
111
    # @return [Hash] Detailed information about the publisher including its signals.
112
    def find_by_id(id:)
1✔
113
      ActiveRecord::Base.connection_pool.with_connection do
3✔
114
        ActiveRecord::Base.transaction do
3✔
115
          publisher = Models::Publisher.includes(:signals).find_by!(id: id)
3✔
116

117
          {
118
            id: publisher.id,
2✔
119
            name: publisher.name,
120
            status: publisher.status,
121
            settings: publisher.settings,
122
            metrics: publisher.metrics,
123
            created_at: publisher.created_at.utc,
124
            updated_at: publisher.updated_at.utc,
125
            signals: publisher.signals.map do |signal|
126
              {
127
                id: signal.id,
2✔
128
                name: signal.name,
129
                created_at: signal.created_at.utc
130
              }
131
            end
132
          }
133
        end
134
      end
135
    end
136

137
    # Retrieves all publishers including signals.
138
    # @return [Array<Hash>] A list of all publishers and their details.
139
    def all
1✔
140
      ActiveRecord::Base.connection_pool.with_connection do
9✔
141
        ActiveRecord::Base.transaction do
9✔
142
          publishers = Models::Publisher.includes(:signals).all
9✔
143

144
          publishers.map do |publisher|
9✔
145
            {
146
              id: publisher.id,
4✔
147
              name: publisher.name,
148
              status: publisher.status,
149
              settings: publisher.settings,
150
              metrics: publisher.metrics,
151
              created_at: publisher.created_at.utc,
152
              updated_at: publisher.updated_at.utc,
153
              signals: publisher.signals.map do |signal|
154
                {
155
                  id: signal.id,
4✔
156
                  name: signal.name,
157
                  created_at: signal.created_at.utc
158
                }
159
              end
160
            }
161
          end
162
        end
163
      end
164
    end
165

166
    # Creates a new publisher with specified settings and metrics.
167
    # @param name [String] The name of the publisher.
168
    # @param batch_size [Integer] The batch size.
169
    # @param concurrency [Integer] The number of publishing threads.
170
    # @param tick_interval [Float] The tick interval in seconds.
171
    # @param poll_interval [Float] The poll interval in seconds.
172
    # @param heartbeat_interval [Float] The heartbeat interval in seconds.
173
    # @param time [Time] The current time context for timestamping.
174
    # @return [Hash] Details of the created publisher.
175
    def create(name:, batch_size:, concurrency:,
1✔
176
               tick_interval:, poll_interval:, heartbeat_interval:,
177
               sweep_interval:, sweep_retention:, sweep_batch_size:,
178
               time: ::Time)
179
      ActiveRecord::Base.connection_pool.with_connection do
15✔
180
        ActiveRecord::Base.transaction do
15✔
181
          current_utc_time = time.now.utc
15✔
182

183
          publisher = Models::Publisher.create!(
15✔
184
            name: name,
185
            status: Status::PUBLISHING,
186
            settings: {
187
              "batch_size" => batch_size,
188
              "concurrency" => concurrency,
189
              "tick_interval" => tick_interval,
190
              "poll_interval" => poll_interval,
191
              "heartbeat_interval" => heartbeat_interval,
192
              "sweep_interval" => sweep_interval,
193
              "sweep_retention" => sweep_retention,
194
              "sweep_batch_size" => sweep_batch_size
195
            },
196
            metrics: {
197
              "throughput" => 0,
198
              "latency" => 0,
199
              "cpu" => 0,
200
              "rss " => 0,
201
              "rtt" => 0
202
            },
203
            created_at: current_utc_time,
204
            updated_at: current_utc_time)
205

206
          @status = Status::PUBLISHING
15✔
207

208
          {
209
            id: publisher.id,
15✔
210
            name: publisher.name,
211
            status: publisher.status,
212
            settings: publisher.settings,
213
            metrics: publisher.metrics,
214
            created_at: publisher.created_at,
215
            updated_at: publisher.updated_at
216
          }
217
        end
218
      end
219
    end
220

221
    # Deletes a publisher by ID, including all associated signals.
222
    # @param id [Integer] The ID of the publisher to delete.
223
    def delete(id:)
1✔
224
      ActiveRecord::Base.connection_pool.with_connection do
17✔
225
        ActiveRecord::Base.transaction do
17✔
226
          publisher = Models::Publisher.lock.find_by!(id: id)
17✔
227
          publisher.signals.destroy_all
17✔
228
          publisher.destroy!
17✔
229
        rescue ActiveRecord::RecordNotFound
230
          # no op
231
        end
232
      end
233
    end
234

235
    Status = Models::Publisher::Status
1✔
236

237
    # Stops the publishing operations for a specified publisher.
238
    # @param id [Integer] The ID of the publisher to stop.
239
    # @param time [Time] The current time context for timestamping.
240
    def stop(id:, time: ::Time)
1✔
241
      ActiveRecord::Base.connection_pool.with_connection do
2✔
242
        ActiveRecord::Base.transaction do
2✔
243
          publisher = Models::Publisher.lock.find(id)
2✔
244
          publisher.update!(status: Status::STOPPED, updated_at: time.now.utc)
2✔
245

246
          @status = Status::STOPPED
2✔
247
        end
248
      end
249
    end
250

251
    # Resumes the publishing operations for a specified publisher.
252
    # @param id [Integer] The ID of the publisher to resume.
253
    # @param time [Time] The current time context for timestamping.
254
    def continue(id:, time: ::Time)
1✔
255
      ActiveRecord::Base.connection_pool.with_connection do
1✔
256
        ActiveRecord::Base.transaction do
1✔
257
          publisher = Models::Publisher.lock.find(id)
1✔
258

259
          publisher.update!(status: Status::PUBLISHING, updated_at: time.now.utc)
1✔
260

261
          @status = Status::PUBLISHING
1✔
262
        end
263
      end
264
    end
265

266
    # Terminates the publishing operations for a specified publisher.
267
    # @param id [Integer] The ID of the publisher to terminate.
268
    # @param time [Time] The current time context for timestamping.
269
    def terminate(id:, time: ::Time)
1✔
270
      ActiveRecord::Base.connection_pool.with_connection do
16✔
271
        ActiveRecord::Base.transaction do
16✔
272
          publisher = Models::Publisher.lock.find(id)
16✔
273
          publisher.update!(status: Status::TERMINATING, updated_at: time.now.utc)
15✔
274

275
          @status = Status::TERMINATING
15✔
276
        rescue ActiveRecord::RecordNotFound
277
          @status = Status::TERMINATING
1✔
278
        end
279
      end
280
    end
281

282
    def signal(id:, name:, time: ::Time)
1✔
283
      ActiveRecord::Base.connection_pool.with_connection do
2✔
284
        ActiveRecord::Base.transaction do
2✔
285
          publisher = Models::Publisher.lock.find(id)
2✔
286
          publisher.signals.create!(name: name, created_at: time.now.utc)
2✔
287

288
          nil
2✔
289
        end
290
      end
291
    end
292

293
    # Sleeps for the specified duration or until terminating, in tick-sized intervals.
294
    # @param duration [Float] The maximum duration to sleep in seconds.
295
    # @param tick_interval [Float] Interval between termination checks.
296
    # @param process [Process] Module used for monotonic clock timing.
297
    # @param kernel [Kernel] Module used for sleep operations.
298
    def sleep(duration, tick_interval:, process:, kernel:)
1✔
299
      start_time = process.clock_gettime(process::CLOCK_MONOTONIC)
40✔
300

301
      while !terminating? &&
40✔
302
            (process.clock_gettime(process::CLOCK_MONOTONIC) - start_time) < duration
91✔
303
        kernel.sleep(tick_interval)
88✔
304
      end
305
    end
306

307
    def trap_signals
1✔
308
      signal_read, signal_write = IO.pipe
15✔
309

310
      %w[TTIN TSTP CONT INT TERM].each do |signal_name|
15✔
311
        old_handler = ::Signal.trap(signal_name) do
75✔
312
          old_handler.call if old_handler.respond_to?(:call)
102✔
313

314
          signal_write.puts(signal_name)
102✔
315
        end
316
      end
317

318
      [signal_read, signal_write]
15✔
319
    end
320

321
    # Creates a new thread that manages heartbeat checks, providing system metrics and
322
    # handling the first signal in the queue.
323
    # @param id [Integer] The ID of the publisher.
324
    # @param heartbeat_interval [Float] The interval in seconds between heartbeats.
325
    # @param tick_interval [Float] The base interval in seconds for sleeping during the loop.
326
    # @param logger [Logger] A logger for logging heartbeat and system status.
327
    # @param time [Time] Current time context.
328
    # @param process [Process] The process module for accessing system metrics.
329
    # @param kernel [Kernel] The kernel module for sleeping operations.
330
    # @return [Thread] The heartbeat thread.
331
    def create_heartbeat_thread(id:,
1✔
332
                                heartbeat_interval:, tick_interval:,
333
                                logger:, time:, process:, kernel:)
334
      Thread.new do
15✔
335
        Thread.current.name = "heartbeat"
15✔
336

337
        while !terminating?
15✔
338
          begin
339
            cpu = `ps -p #{process.pid} -o %cpu`.split("\n").last.to_f
15✔
340
            rss = `ps -p #{process.pid} -o rss`.split("\n").last.to_i
15✔
341

342
            ActiveRecord::Base.connection_pool.with_connection do
15✔
343
              ActiveRecord::Base.transaction do
15✔
344
                start_rtt = process.clock_gettime(process::CLOCK_MONOTONIC)
15✔
345

346
                publisher = Models::Publisher.lock.find(id)
15✔
347

348
                end_rtt = process.clock_gettime(process::CLOCK_MONOTONIC)
15✔
349
                rtt = end_rtt - start_rtt
15✔
350

351
                signal = publisher.signals.order(created_at: :asc).first
15✔
352

353
                if !signal.nil?
15✔
354
                  handle_signal(id: id, name: signal.name, logger: logger)
×
355
                  signal.destroy
×
356
                end
357

358
                throughput = Models::Message
15✔
359
                  .where(status: Message::Status::PUBLISHED)
360
                  .where(publisher_id: id)
361
                  .where("published_at >= ?", 1.second.ago)
362
                  .count
363

364
                last_published_message = Models::Message
15✔
365
                  .where(status: Message::Status::PUBLISHED)
366
                  .where(publisher_id: id)
367
                  .order(published_at: :desc)
368
                  .first
369

370
                latency = if last_published_message.nil?
15✔
371
                            0
13✔
372
                          else
373
                            (Time.now.utc - last_published_message.published_at).to_i
2✔
374
                          end
375

376
                publisher.update!(
15✔
377
                  updated_at: time.now.utc,
378
                  metrics: {
379
                    throughput: throughput,
380
                    latency: latency,
381
                    cpu: cpu,
382
                    rss: rss,
383
                    rtt: rtt
384
                  })
385
              end
386
            end
387

388
            Publisher.sleep(
15✔
389
              heartbeat_interval,
390
              tick_interval: tick_interval,
391
              process: process, kernel: kernel)
392
          rescue ActiveRecord::RecordNotFound => error
393
            logger.fatal(
×
394
              "#{error.class}: #{error.message}\n" \
395
              "#{error.backtrace.join("\n")}")
396

397
            terminate(id: id)
×
398
          rescue StandardError => error
399
            logger.error(
×
400
              "#{error.class}: #{error.message}\n" \
401
              "#{error.backtrace.join("\n")}")
402

403
            Publisher.sleep(
×
404
              heartbeat_interval,
405
              tick_interval: tick_interval, process: process, kernel: kernel)
406
          rescue ::Exception => error
407
            logger.fatal(
×
408
              "#{error.class}: #{error.message}\n" \
409
              "#{error.backtrace.join("\n")}")
410

411
            terminate(id: id)
×
412
          end
413
        end
414
      end
415
    end
416

417
    # Handles received signals and takes appropriate actions
418
    # such as pausing, resuming, or terminating the publisher.
419
    # @param id [Integer] The ID of the publisher affected by the signal.
420
    # @param name [String] The name of the signal received.
421
    # @param logger [Logger] The logger to record actions taken in response to the signal.
422
    def handle_signal(id:, name:, logger:)
1✔
423
      case name
12✔
424
      when "TTIN"
425
        Thread.list.each do |thread|
1✔
426
          thread_name = thread.name || thread.object_id
6✔
427
          backtrace = thread.backtrace || ["<no backtrace available>"]
6✔
428
          logger.info(
6✔
429
            "Outboxer dumping thread #{thread_name}\n" +
430
              backtrace.join("\n"))
431
        end
432
      when "TSTP"
433
        logger.info("Outboxer pausing threads")
1✔
434

435
        begin
436
          stop(id: id)
1✔
437
        rescue ActiveRecord::RecordNotFound => error
×
438
          logger.fatal(
×
439
            "#{error.class}: #{error.message}\n" \
440
            "#{error.backtrace.join("\n")}")
441

442
          terminate(id: id)
×
443
        end
444
      when "CONT"
445
        logger.info("Outboxer resuming threads")
1✔
446

447
        begin
448
          continue(id: id)
1✔
449
        rescue ActiveRecord::RecordNotFound => error
×
450
          logger.fatal(
×
451
            "#{error.class}: #{error.message}\n" \
452
            "#{error.backtrace.join("\n")}")
453

454
          terminate(id: id)
×
455
        end
456
      when "INT", "TERM"
457
        logger.info("Outboxer terminating threads")
9✔
458

459
        terminate(id: id)
9✔
460
      end
461
    end
462

463
    PUBLISH_MESSAGES_DEFAULTS = {
1✔
464
      batch_size: 1000,
465
      concurrency: 1,
466
      tick_interval: 0.1,
467
      poll_interval: 5.0,
468
      heartbeat_interval: 5.0,
469
      sweep_interval: 60,
470
      sweep_retention: 60,
471
      sweep_batch_size: 100,
472
      log_level: 1
473
    }
474

475
    # Publish queued messages concurrently
476
    # @param name [String] The name of the publisher.
477
    # @param batch_size [Integer] The batch size.
478
    # @param concurrency [Integer] The number of publisher threads.
479
    # @param tick_interval [Float] The tick interval in seconds.
480
    # @param poll_interval [Float] The poll interval in seconds.
481
    # @param heartbeat_interval [Float] The heartbeat interval in seconds.
482
    # @param sweep_interval [Float] The interval in seconds between sweeper runs.
483
    # @param sweep_retention [Float] The retention period in seconds for published messages.
484
    # @param sweep_batch_size [Integer] The maximum number of messages to delete per batch.
485
    # @param logger [Logger] Logger for recording publishing activities.
486
    # @param time [Time] The current time context.
487
    # @param process [Process] The process module for system metrics.
488
    # @param kernel [Kernel] The kernel module for sleeping operations.
489
    # @yield [publisher, messages] Yields publisher and messages to be published.
490
    # @yieldparam publisher [Hash] A hash with keys `:id` and `:name` representing the publisher.
491
    # @yieldparam messages [Array<Hash>] An array of message hashes retrieved from the buffer.
492
    def publish_messages(
1✔
493
      name: "#{::Socket.gethostname}:#{::Process.pid}",
494
      batch_size: PUBLISH_MESSAGES_DEFAULTS[:batch_size],
495
      concurrency: PUBLISH_MESSAGES_DEFAULTS[:concurrency],
496
      tick_interval: PUBLISH_MESSAGES_DEFAULTS[:tick_interval],
497
      poll_interval: PUBLISH_MESSAGES_DEFAULTS[:poll_interval],
498
      heartbeat_interval: PUBLISH_MESSAGES_DEFAULTS[:heartbeat_interval],
499
      sweep_interval: PUBLISH_MESSAGES_DEFAULTS[:sweep_interval],
500
      sweep_retention: PUBLISH_MESSAGES_DEFAULTS[:sweep_retention],
501
      sweep_batch_size: PUBLISH_MESSAGES_DEFAULTS[:sweep_batch_size],
502
      logger: Logger.new($stdout, level: PUBLISH_MESSAGES_DEFAULTS[:log_level]),
503
      time: ::Time, process: ::Process, kernel: ::Kernel,
504
      &block
505
    )
506
      logger.info "Outboxer v#{Outboxer::VERSION} running in ruby #{RUBY_VERSION} " \
15✔
507
        "(#{RUBY_RELEASE_DATE} revision #{RUBY_REVISION[0, 10]}) [#{RUBY_PLATFORM}]"
508

509
      logger.info "Outboxer config " \
15✔
510
        "batch_size=#{batch_size}, " \
511
        "concurrency=#{concurrency}, " \
512
        "tick_interval=#{tick_interval} " \
513
        "poll_interval=#{poll_interval}, " \
514
        "heartbeat_interval=#{heartbeat_interval}, " \
515
        "sweep_interval=#{sweep_interval}, " \
516
        "sweep_retention=#{sweep_retention}, " \
517
        "sweep_batch_size=#{sweep_batch_size}, " \
518
        "log_level=#{logger.level}"
519

520
      Setting.create_all
15✔
521

522
      publisher = create(
15✔
523
        name: name, batch_size: batch_size,
524
        concurrency: concurrency,
525
        tick_interval: tick_interval, poll_interval: poll_interval,
526
        heartbeat_interval: heartbeat_interval,
527
        sweep_interval: sweep_interval,
528
        sweep_retention: sweep_retention,
529
        sweep_batch_size: sweep_batch_size)
530

531
      heartbeat_thread = create_heartbeat_thread(
15✔
532
        id: publisher[:id], heartbeat_interval: heartbeat_interval, tick_interval: tick_interval,
533
        logger: logger, time: time, process: process, kernel: kernel)
534

535
      sweeper_thread = create_sweeper_thread(
15✔
536
        id: publisher[:id],
537
        sweep_interval: sweep_interval,
538
        sweep_retention: sweep_retention,
539
        sweep_batch_size: sweep_batch_size,
540
        tick_interval: tick_interval,
541
        logger: logger,
542
        time: time,
543
        process: process,
544
        kernel: kernel)
545

546
      publisher_threads = Array.new(concurrency) do |index|
15✔
547
        create_publisher_thread(
15✔
548
          id: publisher[:id], name: name, index: index, batch_size: batch_size,
549
          poll_interval: poll_interval, tick_interval: tick_interval,
550
          logger: logger, process: process, kernel: kernel, &block)
551
      end
552

553
      signal_read, _signal_write = trap_signals
15✔
554

555
      while !terminating?
15✔
556
        if signal_read.wait_readable(tick_interval)
42✔
557
          handle_signal(id: publisher[:id], name: signal_read.gets.chomp, logger: logger)
12✔
558
        end
559
      end
560

561
      logger.info "Outboxer terminating"
15✔
562

563
      publisher_threads.each(&:join)
15✔
564

565
      heartbeat_thread.join
15✔
566
      sweeper_thread.join
15✔
567

568
      delete(id: publisher[:id])
15✔
569

570
      logger.info "Outboxer terminated"
15✔
571
    end
572

573
    def pool(concurrency:)
1✔
574
      concurrency + 3 # (main + heartbeat + sweeper)
×
575
    end
576

577
    # @param id [Integer] Publisher id.
578
    # @param name [String] Publisher name.
579
    # @param batch_size [Integer] Max number of messages per batch.
580
    # @param index [Integer] Zero-based thread index (used for thread name).
581
    # @param poll_interval [Numeric] Seconds to wait when no messages found.
582
    # @param tick_interval [Numeric] Seconds between signal checks during sleep.
583
    # @param logger [Logger] Logger used for info/error/fatal messages.
584
    # @param process [Object] Process-like object passed to `Publisher.sleep`.
585
    # @param kernel [Object] Kernel object passed to `Publisher.sleep`.
586
    # @yieldparam publisher [Hash{Symbol=>Integer,String}] Publisher details,
587
    #   e.g., `{ id: Integer, name: String }`.
588
    # @yieldparam messages [Array<Hash>] Batch of messages to publish.
589
    # @return [Thread] The created publishing thread.
590
    def create_publisher_thread(id:, name:, batch_size:, index:,
1✔
591
                                poll_interval:, tick_interval:,
592
                                logger:, process:, kernel:, &block)
593
      Thread.new do
15✔
594
        begin
595
          Thread.current.name = "publisher-#{index + 1}"
15✔
596

597
          while !terminating?
15✔
598
            begin
599
              messages = buffer_messages(id: id, name: name, limit: batch_size)
22✔
600

601
              if messages.any?
20✔
602
                block.call({ id: id, name: name }, messages)
8✔
603
              else
604
                Publisher.sleep(
12✔
605
                  poll_interval,
606
                  tick_interval: tick_interval,
607
                  process: process,
608
                  kernel: kernel)
609
              end
610
            rescue StandardError => error
611
              logger.error(
3✔
612
                "#{error.class}: #{error.message}\n" \
613
                "#{error.backtrace.join("\n")}")
614
            end
615
          end
616
        rescue ::Exception => error
617
          logger.fatal(
3✔
618
            "#{error.class}: #{error.message}\n" \
619
            "#{error.backtrace.join("\n")}")
620

621
          terminate(id: id)
3✔
622
        ensure
623
          logger.info "#{Thread.current.name} shutting down"
15✔
624
        end
625
      end
626
    end
627

628
    # Creates a sweeper thread to periodically delete old published messages for a publisher.
629
    #
630
    # @param id [Integer] The ID of the publisher.
631
    # @param sweep_interval [Float] Time in seconds between each sweep run.
632
    # @param sweep_retention [Float] Retention period in seconds for keeping published messages.
633
    # @param sweep_batch_size [Integer] Max number of messages to delete in each sweep.
634
    # @param tick_interval [Float] Time in seconds between signal checks while sleeping.
635
    # @param logger [Logger] Logger instance to report errors and fatal issues.
636
    # @param time [Time] Module used for fetching current UTC time.
637
    # @param process [Process] Process module used for monotonic clock timings.
638
    # @param kernel [Kernel] Kernel module used for sleeping between sweeps.
639
    # @return [Thread] The thread executing the sweeping logic.
640
    def create_sweeper_thread(id:, sweep_interval:, sweep_retention:, sweep_batch_size:,
1✔
641
                              tick_interval:, logger:, time:, process:, kernel:)
642
      Thread.new do
15✔
643
        Thread.current.name = "sweeper"
15✔
644

645
        while !terminating?
15✔
646
          begin
647
            deleted_count = Message.delete_batch(
107,432✔
648
              status: Message::Status::PUBLISHED,
649
              older_than: time.now.utc - sweep_retention,
650
              batch_size: sweep_batch_size)[:deleted_count]
651

652
            if deleted_count == 0
14✔
653
              Publisher.sleep(
13✔
654
                sweep_interval,
655
                tick_interval: tick_interval,
656
                process: process,
657
                kernel: kernel)
658
            end
659
          rescue StandardError => error
660
            logger.error(
107,416✔
661
              "#{error.class}: #{error.message}\n" \
662
              "#{error.backtrace.join("\n")}")
663
          rescue ::Exception => error
664
            logger.fatal(
2✔
665
              "#{error.class}: #{error.message}\n" \
666
              "#{error.backtrace.join("\n")}")
667

668
            terminate(id: id)
2✔
669
          end
670
        end
671
      end
672
    end
673

674
    # Marks queued messages as buffered.
675
    #
676
    # @param id [Integer] the ID of the publisher.
677
    # @param name [String] the name of the publisher.
678
    # @param limit [Integer] the number of messages to buffer.
679
    # @param time [Time] current time context used to update timestamps.
680
    # @return [Array<Hash>] buffered messages.
681
    def buffer_messages(id:, name:, limit: 1, time: ::Time)
1✔
682
      current_utc_time = time.now.utc
21✔
683
      messages = []
21✔
684

685
      ActiveRecord::Base.connection_pool.with_connection do
21✔
686
        ActiveRecord::Base.transaction do
21✔
687
          messages = Models::Message
21✔
688
            .where(status: Message::Status::QUEUED)
689
            .order(updated_at: :asc)
690
            .limit(limit)
691
            .lock("FOR UPDATE SKIP LOCKED")
692
            .pluck(:id, :messageable_type, :messageable_id)
693

694
          message_ids = messages.map(&:first)
21✔
695

696
          updated_rows = Models::Message
21✔
697
            .where(id: message_ids, status: Message::Status::QUEUED)
698
            .update_all(
699
              status: Message::Status::PUBLISHING,
700
              updated_at: current_utc_time,
701
              buffered_at: current_utc_time,
702
              publisher_id: id,
703
              publisher_name: name)
704

705
          if updated_rows != message_ids.size
21✔
706
            raise ArgumentError, "Some messages not buffered"
×
707
          end
708
        end
709
      end
710

711
      messages.map do |message_id, messageable_type, messageable_id|
21✔
712
        {
713
          id: message_id,
10✔
714
          messageable_type: messageable_type,
715
          messageable_id: messageable_id
716
        }
717
      end
718
    end
719

720
    # Updates messages as published or failed.
721
    #
722
    # @param id [Integer]
723
    # @param published_message_ids [Array<Integer>]
724
    # @param failed_messages [Array<Hash>] Array of failed message hashes:
725
    #   [
726
    #     {
727
    #       id: Integer,
728
    #       exception: {
729
    #         class_name: String,
730
    #         message_text: String,
731
    #         backtrace: Array<String>
732
    #       }
733
    #     }
734
    #   ]
735
    # @param time [Time]
736
    # @return [nil]
737
    def update_messages(id:, published_message_ids: [], failed_messages: [],
1✔
738
                        time: ::Time)
739
      current_utc_time = time.now.utc
8✔
740

741
      ActiveRecord::Base.connection_pool.with_connection do
8✔
742
        ActiveRecord::Base.transaction do
8✔
743
          if published_message_ids.any?
8✔
744
            messages = Models::Message
7✔
745
              .where(id: published_message_ids, status: Message::Status::PUBLISHING)
746
              .lock("FOR UPDATE")
747
              .pluck(:id)
748

749
            if messages.size != published_message_ids.size
7✔
750
              raise ArgumentError, "Some messages publishing not locked for update"
1✔
751
            end
752

753
            updated_rows = Models::Message
6✔
754
              .where(status: Status::PUBLISHING, id: published_message_ids)
755
              .update_all(
756
                status: Message::Status::PUBLISHED,
757
                updated_at: current_utc_time,
758
                published_at: current_utc_time,
759
                publisher_id: id)
760

761
            if updated_rows != published_message_ids.size
6✔
762
              raise ArgumentError, "Some messages publishing not updated to published"
×
763
            end
764
          end
765

766
          failed_messages.each do |failed_message|
7✔
767
            message = Models::Message
5✔
768
              .lock("FOR UPDATE")
769
              .find_by!(id: failed_message[:id], status: Message::Status::PUBLISHING)
770

771
            message.update!(status: Message::Status::FAILED, updated_at: current_utc_time)
5✔
772

773
            exception = message.exceptions.create!(
5✔
774
              class_name: failed_message[:exception][:class_name],
775
              message_text: failed_message[:exception][:message_text],
776
              created_at: current_utc_time)
777

778
            (failed_message[:exception][:backtrace] || []).each_with_index do |frame, index|
5✔
779
              exception.frames.create!(index: index, text: frame)
12✔
780
            end
781
          end
782
        end
783
      end
784

785
      nil
786
    end
787
  end
788
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc