class Concurrent::Actor::Utils::Balancer

Distributes messages between subscribed actors. Each actor'll get only one message then it's unsubscribed. The actor needs to resubscribe when it's ready to receive next message. It will buffer the messages if there is no worker registered. @see Pool

Public Class Methods

new() click to toggle source
# File lib/concurrent/actor/utils/balancer.rb, line 11
def initialize
  @receivers = []
  @buffer    = []
end

Public Instance Methods

distribute() click to toggle source
# File lib/concurrent/actor/utils/balancer.rb, line 35
def distribute
  while !@receivers.empty? && !@buffer.empty?
    redirect @receivers.shift, @buffer.shift
  end
end
on_message(message) click to toggle source
# File lib/concurrent/actor/utils/balancer.rb, line 16
def on_message(message)
  command, who = message
  case command
  when :subscribe
    @receivers << (who || envelope.sender)
    distribute
    true
  when :unsubscribe
    @receivers.delete(who || envelope.sender)
    true
  when :subscribed?
    @receivers.include?(who || envelope.sender)
  else
    @buffer << envelope
    distribute
    Behaviour::MESSAGE_PROCESSED
  end
end