class Riffer::Runner::Fibers
Processes items concurrently using fibers via the async gem. max_concurrency caps simultaneous fibers via an Async::Semaphore. If multiple fibers raise, only the first exception is re-raised after all finish.
Public Class Methods
Source
# File lib/riffer/runner/fibers.rb, line 13 def initialize(max_concurrency: nil) depends_on "async" depends_on "async/semaphore" if max_concurrency @max_concurrency = max_concurrency end
Public Instance Methods
Source
# File lib/riffer/runner/fibers.rb, line 21 def map(items, context:, &block) return [] if items.empty? results = Array.new(items.size) errors = Array.new(items.size) Async do barrier = Async::Barrier.new max = @max_concurrency parent = if max Async::Semaphore.new(max, parent: barrier) else barrier end items.each_with_index do |item, index| parent.async do results[index] = block.call(item) rescue => e errors[index] = e end end barrier.wait end first_error = errors.compact.first raise first_error if first_error results end