class Concurrent::Actor::Utils::Pool

Allows to create a pool of workers and distribute work between them @param [Integer] size number of workers @yield [balancer, index] a block spawning an worker instance. called size times.

The worker should be descendant of AbstractWorker and supervised, see example.

@yieldparam [Balancer] balancer to pass to the worker @yieldparam [Integer] index of the worker, usually used in its name @yieldreturn [Reference] the reference of newly created worker @example

class Worker < Concurrent::Actor::RestartingContext
  def on_message(message)
    p message * 5
  end
end

pool = Concurrent::Actor::Utils::Pool.spawn! 'pool', 5 do |index|
  Worker.spawn name: "worker-#{index}", supervise: true, args: []
end

pool << 'asd' << 2
# prints:
# "asdasdasdasdasd"
# 10

Public Class Methods

new(size, &worker_initializer) click to toggle source
# File lib/concurrent/actor/utils/pool.rb, line 30
def initialize(size, &worker_initializer)
  @balancer = Balancer.spawn name: :balancer, supervise: true
  @workers  = ::Array.new(size, &worker_initializer)
  @workers.each do |worker|
    Type! worker, Reference
    @balancer << [:subscribe, worker]
  end
end

Public Instance Methods

on_message(message) click to toggle source
# File lib/concurrent/actor/utils/pool.rb, line 39
def on_message(message)
  command, _ = message
  return if [:restarted, :reset, :resumed, :terminated].include? command # ignore events from supervised actors

  envelope_to_redirect = if envelope.future
                           envelope
                         else
                           Envelope.new(envelope.message, Concurrent.future, envelope.sender, envelope.address)
                         end
  envelope_to_redirect.future.on_completion! { @balancer << :subscribe } # TODO check safety of @balancer reading
  redirect @balancer, envelope_to_redirect
end