class Concurrent::Actor::Core

Core of the actor. @note Whole class should be considered private. An user should use {Context}s and {Reference}s only. @note devel: core should not block on anything, e.g. it cannot wait on children to terminate

that would eat up all threads in task pool and deadlock

Attributes

behaviour_definition[R]

@!attribute [r] reference

Reference to this actor which can be safely passed around.
@return [Reference]

@!attribute [r] name

The name of actor instance, it should be uniq (not enforced). Allows easier orientation
between actor instances.
@return [String]

@!attribute [r] path

Path of this actor. It is used for easier orientation and logging.
Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root},
e.g. `/an_actor/its_child`.
@return [String]

@!attribute [r] executor

Executor which is used to process messages.
@return [Executor]

@!attribute [r] actor_class

A subclass of {AbstractContext} representing Actor's behaviour.
@return [Context]
context[R]

@!attribute [r] reference

Reference to this actor which can be safely passed around.
@return [Reference]

@!attribute [r] name

The name of actor instance, it should be uniq (not enforced). Allows easier orientation
between actor instances.
@return [String]

@!attribute [r] path

Path of this actor. It is used for easier orientation and logging.
Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root},
e.g. `/an_actor/its_child`.
@return [String]

@!attribute [r] executor

Executor which is used to process messages.
@return [Executor]

@!attribute [r] actor_class

A subclass of {AbstractContext} representing Actor's behaviour.
@return [Context]
context_class[R]

@!attribute [r] reference

Reference to this actor which can be safely passed around.
@return [Reference]

@!attribute [r] name

The name of actor instance, it should be uniq (not enforced). Allows easier orientation
between actor instances.
@return [String]

@!attribute [r] path

Path of this actor. It is used for easier orientation and logging.
Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root},
e.g. `/an_actor/its_child`.
@return [String]

@!attribute [r] executor

Executor which is used to process messages.
@return [Executor]

@!attribute [r] actor_class

A subclass of {AbstractContext} representing Actor's behaviour.
@return [Context]
executor[R]

@!attribute [r] reference

Reference to this actor which can be safely passed around.
@return [Reference]

@!attribute [r] name

The name of actor instance, it should be uniq (not enforced). Allows easier orientation
between actor instances.
@return [String]

@!attribute [r] path

Path of this actor. It is used for easier orientation and logging.
Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root},
e.g. `/an_actor/its_child`.
@return [String]

@!attribute [r] executor

Executor which is used to process messages.
@return [Executor]

@!attribute [r] actor_class

A subclass of {AbstractContext} representing Actor's behaviour.
@return [Context]
name[R]

@!attribute [r] reference

Reference to this actor which can be safely passed around.
@return [Reference]

@!attribute [r] name

The name of actor instance, it should be uniq (not enforced). Allows easier orientation
between actor instances.
@return [String]

@!attribute [r] path

Path of this actor. It is used for easier orientation and logging.
Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root},
e.g. `/an_actor/its_child`.
@return [String]

@!attribute [r] executor

Executor which is used to process messages.
@return [Executor]

@!attribute [r] actor_class

A subclass of {AbstractContext} representing Actor's behaviour.
@return [Context]
path[R]

@!attribute [r] reference

Reference to this actor which can be safely passed around.
@return [Reference]

@!attribute [r] name

The name of actor instance, it should be uniq (not enforced). Allows easier orientation
between actor instances.
@return [String]

@!attribute [r] path

Path of this actor. It is used for easier orientation and logging.
Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root},
e.g. `/an_actor/its_child`.
@return [String]

@!attribute [r] executor

Executor which is used to process messages.
@return [Executor]

@!attribute [r] actor_class

A subclass of {AbstractContext} representing Actor's behaviour.
@return [Context]
reference[R]

@!attribute [r] reference

Reference to this actor which can be safely passed around.
@return [Reference]

@!attribute [r] name

The name of actor instance, it should be uniq (not enforced). Allows easier orientation
between actor instances.
@return [String]

@!attribute [r] path

Path of this actor. It is used for easier orientation and logging.
Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root},
e.g. `/an_actor/its_child`.
@return [String]

@!attribute [r] executor

Executor which is used to process messages.
@return [Executor]

@!attribute [r] actor_class

A subclass of {AbstractContext} representing Actor's behaviour.
@return [Context]

Public Class Methods

new(opts = {}, &block) click to toggle source

@option opts [String] name @option opts [Context] actor_class a class to be instantiated defining Actor's behaviour @option opts [Array<Object>] args arguments for actor_class instantiation @option opts [Executor] executor, default is `global_io_executor` @option opts [true, false] link, atomically link the actor to its parent (default: true) @option opts [Class] reference a custom descendant of {Reference} to use @option opts [Array<Array(Behavior::Abstract, Array<Object>)>] #behaviour_definition, array of pairs

where each pair is behaviour class and its args, see {Behaviour.basic_behaviour_definition}

@option opts [CompletableFuture, nil] initialized, if present it'll be set or failed after {Context} initialization @option opts [Reference, nil] parent **private api** parent of the actor (the one spawning ) @option opts [Proc, nil] logger a proc accepting (level, progname, message = nil, &block) params,

can be used to hook actor instance to any logging system, see {Concurrent::Concern::Logging}

@param [Proc] block for class instantiation

Calls superclass method
# File lib/concurrent/actor/core.rb, line 50
def initialize(opts = {}, &block)
  super(&nil)
  synchronize { ns_initialize(opts, &block) }
end

Public Instance Methods

add_child(child) click to toggle source

@api private

# File lib/concurrent/actor/core.rb, line 74
def add_child(child)
  guard!
  Type! child, Reference
  @children.add child
  nil
end
allocate_context() click to toggle source

@api private

# File lib/concurrent/actor/core.rb, line 149
def allocate_context
  @context = @context_class.allocate
end
behaviour(behaviour_class) click to toggle source

@param [Class] behaviour_class @return [Behaviour::Abstract, nil] based on behaviour_class

# File lib/concurrent/actor/core.rb, line 137
def behaviour(behaviour_class)
  @behaviours[behaviour_class]
end
behaviour!(behaviour_class) click to toggle source

@param [Class] behaviour_class @return [Behaviour::Abstract] based on behaviour_class @raise [KeyError] when no behaviour

# File lib/concurrent/actor/core.rb, line 144
def behaviour!(behaviour_class)
  @behaviours.fetch behaviour_class
end
broadcast(public, event) click to toggle source
# File lib/concurrent/actor/core.rb, line 130
def broadcast(public, event)
  log(DEBUG) { "event: #{event.inspect} (#{public ? 'public' : 'private'})" }
  @first_behaviour.on_event(public, event)
end
build_context() click to toggle source

@api private

# File lib/concurrent/actor/core.rb, line 154
def build_context
  @context.send :initialize_core, self
  @context.send :initialize, *@args, &@block
end
children() click to toggle source

@return [Array<Reference>] of children actors

# File lib/concurrent/actor/core.rb, line 68
def children
  guard!
  @children.to_a
end
dead_letter_routing() click to toggle source

@see Concurrent::Actor::AbstractContext#dead_letter_routing

# File lib/concurrent/actor/core.rb, line 63
def dead_letter_routing
  @context.dead_letter_routing
end
guard!() click to toggle source

ensures that we are inside of the executor

# File lib/concurrent/actor/core.rb, line 101
def guard!
  unless Actor.current == reference
    raise "can be called only inside actor #{reference} but was #{Actor.current}"
  end
end
log(level, message = nil, &block) click to toggle source
Calls superclass method
# File lib/concurrent/actor/core.rb, line 107
def log(level, message = nil, &block)
  super level, @path, message, &block
end
on_envelope(envelope) click to toggle source

is executed by Reference scheduling processing of new messages can be called from other alternative Reference implementations @param [Envelope] envelope

# File lib/concurrent/actor/core.rb, line 92
def on_envelope(envelope)
  schedule_execution do
    log(DEBUG) { "was #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender}" }
    process_envelope envelope
  end
  nil
end
parent() click to toggle source

A parent Actor. When actor is spawned the {Actor.current} becomes its parent. When actor is spawned from a thread outside of an actor ({Actor.current} is nil) {Actor.root} is assigned. @return [Reference, nil]

# File lib/concurrent/actor/core.rb, line 58
def parent
  @parent_core && @parent_core.reference
end
process_envelope(envelope) click to toggle source

@api private

# File lib/concurrent/actor/core.rb, line 160
def process_envelope(envelope)
  @first_behaviour.on_envelope envelope
end
remove_child(child) click to toggle source

@api private

# File lib/concurrent/actor/core.rb, line 82
def remove_child(child)
  guard!
  Type! child, Reference
  @children.delete child
  nil
end
schedule_execution() { || ... } click to toggle source

Schedules blocks to be executed on executor sequentially, sets Actress.current

# File lib/concurrent/actor/core.rb, line 113
def schedule_execution
  @serialized_execution.post(@executor) do
    synchronize do
      begin
        Thread.current[:__current_actor__] = reference
        yield
      rescue => e
        log FATAL, e
      ensure
        Thread.current[:__current_actor__] = nil
      end
    end
  end

  nil
end

Private Instance Methods

initialize_behaviours(opts) click to toggle source
# File lib/concurrent/actor/core.rb, line 210
def initialize_behaviours(opts)
  @behaviour_definition = (Type! opts[:behaviour_definition] || @context.behaviour_definition, ::Array).each do |(behaviour, _)|
    Child! behaviour, Behaviour::Abstract
  end
  @behaviours           = {}
  @first_behaviour      = @behaviour_definition.reverse.
      reduce(nil) { |last, (behaviour, *args)| @behaviours[behaviour] = behaviour.new(self, last, opts, *args) }
end
ns_initialize(opts, &block) click to toggle source
# File lib/concurrent/actor/core.rb, line 166
def ns_initialize(opts, &block)
  @mailbox              = ::Array.new
  @serialized_execution = SerializedExecution.new
  @children             = Set.new

  @context_class = Child! opts.fetch(:class), AbstractContext
  allocate_context

  @executor = Type! opts.fetch(:executor, @context.default_executor), Concurrent::AbstractExecutorService
  raise ArgumentError, 'ImmediateExecutor is not supported' if @executor.is_a? ImmediateExecutor

  @reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
  @name      = (Type! opts.fetch(:name), String, Symbol).to_s

  parent       = opts[:parent]
  @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
  if @parent_core.nil? && @name != '/'
    raise 'only root has no parent'
  end

  @path   = @parent_core ? File.join(@parent_core.path, @name) : @name
  @logger = opts[:logger]

  @parent_core.add_child reference if @parent_core

  initialize_behaviours opts

  @args       = opts.fetch(:args, [])
  @block      = block
  initialized = Type! opts[:initialized], Edge::CompletableFuture, NilClass

  schedule_execution do
    begin
      build_context
      initialized.success reference if initialized
      log DEBUG, 'spawned'
    rescue => ex
      log ERROR, ex
      @first_behaviour.terminate!
      initialized.fail ex if initialized
    end
  end
end