class Riffer::Runner::Threaded
Processes items concurrently using a thread pool.
Maintains up to max_concurrency worker threads that pull items from a shared queue. When a worker finishes one item it immediately picks up the next, so a single slow item does not block other workers.
If multiple workers raise, only the first exception is re-raised after all workers finish; subsequent errors are discarded.
runner = Riffer::Runner::Threaded.new(max_concurrency: 3) runner.map(items) { |item| expensive_operation(item) }
Constants
- DEFAULT_MAX_CONCURRENCY
Public Class Methods
Source
# File lib/riffer/runner/threaded.rb, line 23 def initialize(max_concurrency: DEFAULT_MAX_CONCURRENCY) @max_concurrency = max_concurrency end
- max_concurrency
-
maximum number of threads to run simultaneously.
Public Instance Methods
Source
# File lib/riffer/runner/threaded.rb, line 29 def map(items, context:, &block) return [] if items.empty? results = Array.new(items.size) errors = Array.new(items.size) queue = Queue.new items.each_with_index { |item, i| queue << [item, i] } workers = [items.size, @max_concurrency].min.times.map do Thread.new do loop do pair = begin queue.pop(true) rescue ThreadError break end item, index = pair begin results[index] = block.call(item) rescue => e errors[index] = e end end end end workers.each(&:join) first_error = errors.compact.first raise first_error if first_error results end