class Concurrent::Actor::Utils::Balancer
Distributes messages between subscribed actors. Each actor'll get only one message then it's unsubscribed. The actor needs to resubscribe when it's ready to receive next message. It will buffer the messages if there is no worker registered. @see Pool
Public Class Methods
new()
click to toggle source
# File lib/concurrent/actor/utils/balancer.rb, line 11 def initialize @receivers = [] @buffer = [] end
Public Instance Methods
distribute()
click to toggle source
# File lib/concurrent/actor/utils/balancer.rb, line 35 def distribute while !@receivers.empty? && !@buffer.empty? redirect @receivers.shift, @buffer.shift end end
on_message(message)
click to toggle source
# File lib/concurrent/actor/utils/balancer.rb, line 16 def on_message(message) command, who = message case command when :subscribe @receivers << (who || envelope.sender) distribute true when :unsubscribe @receivers.delete(who || envelope.sender) true when :subscribed? @receivers.include?(who || envelope.sender) else @buffer << envelope distribute Behaviour::MESSAGE_PROCESSED end end