class Dynflow::Director::ExecutionPlanManager

Attributes

execution_plan[R]
future[R]

Public Class Methods

new(world, execution_plan, future) click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 11
def initialize(world, execution_plan, future)
  @world                 = Type! world, World
  @execution_plan        = Type! execution_plan, ExecutionPlan
  @future                = Type! future, Concurrent::Promises::ResolvableFuture
  @running_steps_manager = RunningStepsManager.new(world)
  @halted                = false

  unless [:planned, :paused].include? execution_plan.state
    raise "execution_plan is not in pending or paused state, it's #{execution_plan.state}"
  end
  execution_plan.update_state(:running)
end

Public Instance Methods

done?() click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 80
def done?
  @halted || (!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?)
end
event(event) click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 72
def event(event)
  Type! event, Event
  unless event.execution_plan_id == @execution_plan.id
    raise "event #{event.inspect} doesn't belong to plan #{@execution_plan.id}"
  end
  @running_steps_manager.event(event)
end
halt() click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 29
def halt
  @halted = true
  @running_steps_manager.terminate
end
prepare_next_step(step) click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 40
def prepare_next_step(step)
  StepWorkItem.new(execution_plan.id, step, step.queue, @world.id).tap do |work|
    @running_steps_manager.add(step, work)
  end
end
restart() click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 34
def restart
  @run_manager = nil
  @finalize_manager = nil
  start
end
start() click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 24
def start
  raise "The future was already set" if @future.resolved?
  start_run or start_finalize or finish
end
terminate() click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 84
def terminate
  @running_steps_manager.terminate
end
what_is_next(work) click to toggle source

@return [Array<WorkItem>] of Work items to continue with

# File lib/dynflow/director/execution_plan_manager.rb, line 47
def what_is_next(work)
  Type! work, WorkItem

  case work
  when StepWorkItem
    step = work.step
    update_steps([step])
    suspended, work = @running_steps_manager.done(step)
    work = compute_next_from_step(step) unless suspended
    work
  when FinalizeWorkItem
    if work.finalize_steps_data
      steps = work.finalize_steps_data.map do |step_data|
        Serializable.from_hash(step_data, execution_plan.id, @world)
      end
      update_steps(steps)
    end
    raise "Finalize work item without @finalize_manager ready" unless @finalize_manager
    @finalize_manager.done!
    finish
  else
    raise "Unexpected work #{work}"
  end
end

Private Instance Methods

compute_next_from_step(step) click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 94
def compute_next_from_step(step)
  raise "run manager not set" unless @run_manager
  raise "run manager already done" if @run_manager.done?
  return [] if @halted

  next_steps = @run_manager.what_is_next(step)
  if @run_manager.done?
    start_finalize or finish
  else
    next_steps.map { |s| prepare_next_step(s) }
  end
end
finish() click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 126
def finish
  return no_work
end
no_work() click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 107
def no_work
  raise "No work but not done" unless done?
  []
end
start_finalize() click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 119
def start_finalize
  return if execution_plan.finalize_flow.empty?
  raise 'finalize phase already started' if @finalize_manager
  @finalize_manager = SequentialManager.new(@world, execution_plan)
  [FinalizeWorkItem.new(execution_plan.id, execution_plan.finalize_steps.first.queue, @world.id)]
end
start_run() click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 112
def start_run
  return if execution_plan.run_flow.empty?
  raise 'run phase already started' if @run_manager
  @run_manager = FlowManager.new(execution_plan, execution_plan.run_flow)
  @run_manager.start.map { |s| prepare_next_step(s) }.tap { |a| raise if a.empty? }
end
update_steps(steps) click to toggle source
# File lib/dynflow/director/execution_plan_manager.rb, line 90
def update_steps(steps)
  steps.each { |step| execution_plan.steps[step.id] = step }
end