class Concurrent::Actor::Behaviour::Buffer
Any message reaching this behaviour is buffered. Only one message is is scheduled at any given time. Others are kept in buffer until another one can be scheduled. This effectively means that messages handled by behaviours before buffer have higher priority and they can be processed before messages arriving into buffer. This allows for the processing of internal actor messages like (`:link`, `:supervise`) first.
Public Class Methods
new(core, subsequent, core_options)
click to toggle source
Calls superclass method
Concurrent::Actor::Behaviour::Abstract.new
# File lib/concurrent/actor/behaviour/buffer.rb, line 12 def initialize(core, subsequent, core_options) super core, subsequent, core_options @buffer = [] @receive_envelope_scheduled = false end
Public Instance Methods
on_envelope(envelope)
click to toggle source
# File lib/concurrent/actor/behaviour/buffer.rb, line 18 def on_envelope(envelope) @buffer.push envelope process_envelopes? MESSAGE_PROCESSED end
on_event(public, event)
click to toggle source
Calls superclass method
Concurrent::Actor::Behaviour::Abstract#on_event
# File lib/concurrent/actor/behaviour/buffer.rb, line 44 def on_event(public, event) event_name, _ = event case event_name when :terminated, :restarted @buffer.each { |envelope| reject_envelope envelope } @buffer.clear end super public, event_name end
process_envelope()
click to toggle source
# File lib/concurrent/actor/behaviour/buffer.rb, line 35 def process_envelope envelope = @buffer.shift return nil unless envelope pass envelope ensure @receive_envelope_scheduled = false core.schedule_execution { process_envelopes? } end
process_envelopes?()
click to toggle source
Ensures that only one envelope processing is scheduled with schedule_execution, this allows other scheduled blocks to be executed before next envelope processing. Simply put this ensures that Core is still responsive to internal calls (like add_child) even though the Actor is flooded with messages.
# File lib/concurrent/actor/behaviour/buffer.rb, line 28 def process_envelopes? unless @buffer.empty? || @receive_envelope_scheduled @receive_envelope_scheduled = true process_envelope end end