class Concurrent::Actor::Core
Core of the actor. @note Whole class should be considered private. An user should use {Context}s and {Reference}s only. @note devel: core should not block on anything, e.g. it cannot wait on children to terminate
that would eat up all threads in task pool and deadlock
Attributes
@!attribute [r] reference
Reference to this actor which can be safely passed around. @return [Reference]
@!attribute [r] name
The name of actor instance, it should be uniq (not enforced). Allows easier orientation between actor instances. @return [String]
@!attribute [r] path
Path of this actor. It is used for easier orientation and logging. Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root}, e.g. `/an_actor/its_child`. @return [String]
@!attribute [r] executor
Executor which is used to process messages. @return [Executor]
@!attribute [r] actor_class
A subclass of {AbstractContext} representing Actor's behaviour. @return [Context]
@!attribute [r] reference
Reference to this actor which can be safely passed around. @return [Reference]
@!attribute [r] name
The name of actor instance, it should be uniq (not enforced). Allows easier orientation between actor instances. @return [String]
@!attribute [r] path
Path of this actor. It is used for easier orientation and logging. Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root}, e.g. `/an_actor/its_child`. @return [String]
@!attribute [r] executor
Executor which is used to process messages. @return [Executor]
@!attribute [r] actor_class
A subclass of {AbstractContext} representing Actor's behaviour. @return [Context]
@!attribute [r] reference
Reference to this actor which can be safely passed around. @return [Reference]
@!attribute [r] name
The name of actor instance, it should be uniq (not enforced). Allows easier orientation between actor instances. @return [String]
@!attribute [r] path
Path of this actor. It is used for easier orientation and logging. Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root}, e.g. `/an_actor/its_child`. @return [String]
@!attribute [r] executor
Executor which is used to process messages. @return [Executor]
@!attribute [r] actor_class
A subclass of {AbstractContext} representing Actor's behaviour. @return [Context]
@!attribute [r] reference
Reference to this actor which can be safely passed around. @return [Reference]
@!attribute [r] name
The name of actor instance, it should be uniq (not enforced). Allows easier orientation between actor instances. @return [String]
@!attribute [r] path
Path of this actor. It is used for easier orientation and logging. Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root}, e.g. `/an_actor/its_child`. @return [String]
@!attribute [r] executor
Executor which is used to process messages. @return [Executor]
@!attribute [r] actor_class
A subclass of {AbstractContext} representing Actor's behaviour. @return [Context]
@!attribute [r] reference
Reference to this actor which can be safely passed around. @return [Reference]
@!attribute [r] name
The name of actor instance, it should be uniq (not enforced). Allows easier orientation between actor instances. @return [String]
@!attribute [r] path
Path of this actor. It is used for easier orientation and logging. Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root}, e.g. `/an_actor/its_child`. @return [String]
@!attribute [r] executor
Executor which is used to process messages. @return [Executor]
@!attribute [r] actor_class
A subclass of {AbstractContext} representing Actor's behaviour. @return [Context]
@!attribute [r] reference
Reference to this actor which can be safely passed around. @return [Reference]
@!attribute [r] name
The name of actor instance, it should be uniq (not enforced). Allows easier orientation between actor instances. @return [String]
@!attribute [r] path
Path of this actor. It is used for easier orientation and logging. Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root}, e.g. `/an_actor/its_child`. @return [String]
@!attribute [r] executor
Executor which is used to process messages. @return [Executor]
@!attribute [r] actor_class
A subclass of {AbstractContext} representing Actor's behaviour. @return [Context]
@!attribute [r] reference
Reference to this actor which can be safely passed around. @return [Reference]
@!attribute [r] name
The name of actor instance, it should be uniq (not enforced). Allows easier orientation between actor instances. @return [String]
@!attribute [r] path
Path of this actor. It is used for easier orientation and logging. Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root}, e.g. `/an_actor/its_child`. @return [String]
@!attribute [r] executor
Executor which is used to process messages. @return [Executor]
@!attribute [r] actor_class
A subclass of {AbstractContext} representing Actor's behaviour. @return [Context]
Public Class Methods
@option opts [String] name @option opts [Context] actor_class a class to be instantiated defining Actor's behaviour @option opts [Array<Object>] args arguments for actor_class instantiation @option opts [Executor] executor, default is `global_io_executor` @option opts [true, false] link, atomically link the actor to its parent (default: true) @option opts [Class] reference a custom descendant of {Reference} to use @option opts [Array<Array(Behavior::Abstract, Array<Object>)>] #behaviour_definition, array of pairs
where each pair is behaviour class and its args, see {Behaviour.basic_behaviour_definition}
@option opts [CompletableFuture, nil] initialized, if present it'll be set or failed after {Context} initialization @option opts [Reference, nil] parent **private api** parent of the actor (the one spawning ) @option opts [Proc, nil] logger a proc accepting (level, progname, message = nil, &block) params,
can be used to hook actor instance to any logging system, see {Concurrent::Concern::Logging}
@param [Proc] block for class instantiation
# File lib/concurrent/actor/core.rb, line 50 def initialize(opts = {}, &block) super(&nil) synchronize { ns_initialize(opts, &block) } end
Public Instance Methods
@api private
# File lib/concurrent/actor/core.rb, line 74 def add_child(child) guard! Type! child, Reference @children.add child nil end
@api private
# File lib/concurrent/actor/core.rb, line 149 def allocate_context @context = @context_class.allocate end
@param [Class] behaviour_class @return [Behaviour::Abstract, nil] based on behaviour_class
# File lib/concurrent/actor/core.rb, line 137 def behaviour(behaviour_class) @behaviours[behaviour_class] end
@param [Class] behaviour_class @return [Behaviour::Abstract] based on behaviour_class @raise [KeyError] when no behaviour
# File lib/concurrent/actor/core.rb, line 144 def behaviour!(behaviour_class) @behaviours.fetch behaviour_class end
# File lib/concurrent/actor/core.rb, line 130 def broadcast(public, event) log(DEBUG) { "event: #{event.inspect} (#{public ? 'public' : 'private'})" } @first_behaviour.on_event(public, event) end
@api private
# File lib/concurrent/actor/core.rb, line 154 def build_context @context.send :initialize_core, self @context.send :initialize, *@args, &@block end
@return [Array<Reference>] of children actors
# File lib/concurrent/actor/core.rb, line 68 def children guard! @children.to_a end
@see Concurrent::Actor::AbstractContext#dead_letter_routing
# File lib/concurrent/actor/core.rb, line 63 def dead_letter_routing @context.dead_letter_routing end
ensures that we are inside of the executor
# File lib/concurrent/actor/core.rb, line 101 def guard! unless Actor.current == reference raise "can be called only inside actor #{reference} but was #{Actor.current}" end end
# File lib/concurrent/actor/core.rb, line 107 def log(level, message = nil, &block) super level, @path, message, &block end
is executed by Reference scheduling processing of new messages can be called from other alternative Reference implementations @param [Envelope] envelope
# File lib/concurrent/actor/core.rb, line 92 def on_envelope(envelope) schedule_execution do log(DEBUG) { "was #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender}" } process_envelope envelope end nil end
A parent Actor. When actor is spawned the {Actor.current} becomes its parent. When actor is spawned from a thread outside of an actor ({Actor.current} is nil) {Actor.root} is assigned. @return [Reference, nil]
# File lib/concurrent/actor/core.rb, line 58 def parent @parent_core && @parent_core.reference end
@api private
# File lib/concurrent/actor/core.rb, line 160 def process_envelope(envelope) @first_behaviour.on_envelope envelope end
@api private
# File lib/concurrent/actor/core.rb, line 82 def remove_child(child) guard! Type! child, Reference @children.delete child nil end
Schedules blocks to be executed on executor sequentially, sets Actress.current
# File lib/concurrent/actor/core.rb, line 113 def schedule_execution @serialized_execution.post(@executor) do synchronize do begin Thread.current[:__current_actor__] = reference yield rescue => e log FATAL, e ensure Thread.current[:__current_actor__] = nil end end end nil end
Private Instance Methods
# File lib/concurrent/actor/core.rb, line 210 def initialize_behaviours(opts) @behaviour_definition = (Type! opts[:behaviour_definition] || @context.behaviour_definition, ::Array).each do |(behaviour, _)| Child! behaviour, Behaviour::Abstract end @behaviours = {} @first_behaviour = @behaviour_definition.reverse. reduce(nil) { |last, (behaviour, *args)| @behaviours[behaviour] = behaviour.new(self, last, opts, *args) } end
# File lib/concurrent/actor/core.rb, line 166 def ns_initialize(opts, &block) @mailbox = ::Array.new @serialized_execution = SerializedExecution.new @children = Set.new @context_class = Child! opts.fetch(:class), AbstractContext allocate_context @executor = Type! opts.fetch(:executor, @context.default_executor), Concurrent::AbstractExecutorService raise ArgumentError, 'ImmediateExecutor is not supported' if @executor.is_a? ImmediateExecutor @reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self @name = (Type! opts.fetch(:name), String, Symbol).to_s parent = opts[:parent] @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core) if @parent_core.nil? && @name != '/' raise 'only root has no parent' end @path = @parent_core ? File.join(@parent_core.path, @name) : @name @logger = opts[:logger] @parent_core.add_child reference if @parent_core initialize_behaviours opts @args = opts.fetch(:args, []) @block = block initialized = Type! opts[:initialized], Edge::CompletableFuture, NilClass schedule_execution do begin build_context initialized.success reference if initialized log DEBUG, 'spawned' rescue => ex log ERROR, ex @first_behaviour.terminate! initialized.fail ex if initialized end end end