class Riffer::Runner::Fibers
Processes items concurrently using fibers via the async gem.
All items run as fibers simultaneously by default. When max_concurrency is set, an Async::Semaphore limits how many fibers execute at once.
If multiple fibers raise, only the first exception is re-raised after all fibers finish; subsequent errors are discarded.
runner = Riffer::Runner::Fibers.new runner.map(items) { |item| expensive_operation(item) }
Public Class Methods
Source
# File lib/riffer/runner/fibers.rb, line 24 def initialize(max_concurrency: nil) depends_on "async" depends_on "async/semaphore" if max_concurrency @max_concurrency = max_concurrency end
- max_concurrency
-
maximum number of fibers to run simultaneously.
When +nil+, all fibers run without limit.
Public Instance Methods
Source
# File lib/riffer/runner/fibers.rb, line 32 def map(items, context:, &block) return [] if items.empty? results = Array.new(items.size) errors = Array.new(items.size) Async do barrier = Async::Barrier.new parent = if @max_concurrency Async::Semaphore.new(@max_concurrency, parent: barrier) else barrier end items.each_with_index do |item, index| parent.async do results[index] = block.call(item) rescue => e errors[index] = e end end barrier.wait end first_error = errors.compact.first raise first_error if first_error results end