class Proxy::RemoteExecution::Ssh::MQTT::DispatcherActor::Tracker

Public Class Methods

new(limit, clock) click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 61
def initialize(limit, clock)
  @clock = clock
  @limit = limit
  @jobs = {}
  @pending = []
  @running = Set.new
  @hot = Set.new
  @cold = Set.new
end

Public Instance Methods

dispatch_pending() click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 110
def dispatch_pending
  pending_count.times do
    mqtt_notify(@pending.first)
    @hot << @pending.shift
  end
end
done(uuid) click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 89
def done(uuid)
  @jobs.delete(uuid)
  [@pending, @running, @hot, @cold].each do |source|
    source.delete(uuid)
  end
  dispatch_pending
end
mqtt_notify(uuid) click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 125
def mqtt_notify(uuid)
  job = @jobs[uuid]
  return if job.nil?

  Proxy::RemoteExecution::Ssh::MQTT.publish(job.topic, JSON.dump(job.payload))
end
needs_processing?() click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 97
def needs_processing?
  pending_count.positive? || @hot.any? || @cold.any?
end
new(uuid, topic, payload) click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 71
def new(uuid, topic, payload)
  @jobs[uuid] = JobDefinition.new(uuid, topic, payload)
  @pending << uuid
  dispatch_pending
end
pending_count() click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 101
def pending_count
  pending = @pending.count
  return pending if @limit.nil?

  running = [@running, @hot, @cold].map(&:count).sum
  capacity = @limit - running
  pending > capacity ? capacity : pending
end
process() click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 117
def process
  @cold.each { |uuid| schedule_resend(uuid) }
  @cold = @hot
  @hot = Set.new

  dispatch_pending
end
resend(uuid) click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 82
def resend(uuid)
  return unless @jobs[uuid]

  @pending << uuid
  dispatch_pending
end
resend_interval() click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 140
def resend_interval
  settings[:mqtt_resend_interval]
end
running(uuid) click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 77
def running(uuid)
  [@pending, @hot, @cold].each { |source| source.delete(uuid) }
  @running << uuid
end
schedule_resend(uuid) click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 136
def schedule_resend(uuid)
  @clock.ping(Proxy::RemoteExecution::Ssh::MQTT::Dispatcher.instance, resend_interval, uuid, :resend)
end
settings() click to toggle source
# File lib/smart_proxy_remote_execution_ssh/mqtt/dispatcher.rb, line 132
def settings
  Proxy::RemoteExecution::Ssh::Plugin.settings
end