class Concurrent::Edge::Event

Represents an event which will happen in future (will be completed). It has to always happen.

Constants

COMPLETED

@!visibility private

PENDING

@!visibility private

Public Class Methods

new(promise, default_executor) click to toggle source
Calls superclass method
# File lib/concurrent/edge/future.rb, line 197
def initialize(promise, default_executor)
  super()
  @Lock               = Mutex.new
  @Condition          = ConditionVariable.new
  @Promise            = promise
  @DefaultExecutor    = default_executor
  @Touched            = AtomicBoolean.new false
  @Callbacks          = LockFreeStack.new
  @Waiters            = AtomicFixnum.new 0
  self.internal_state = PENDING
end

Public Instance Methods

&(other)
Alias for: zip
add_callback(method, *args) click to toggle source

@!visibility private

# File lib/concurrent/edge/future.rb, line 366
def add_callback(method, *args)
  if completed?
    call_callback method, *args
  else
    @Callbacks.push [method, *args]
    call_callbacks if completed?
  end
  self
end
blocks() click to toggle source

@!visibility private just for inspection @return [Array<AbstractPromise>]

# File lib/concurrent/edge/future.rb, line 353
def blocks
  @Callbacks.each_with_object([]) do |callback, promises|
    promises.push(*(callback.select { |v| v.is_a? AbstractPromise }))
  end
end
callbacks() click to toggle source

@!visibility private just for inspection

# File lib/concurrent/edge/future.rb, line 361
def callbacks
  @Callbacks.each.to_a
end
chain(executor = nil, &callback) click to toggle source

@yield [success, value, reason] of the parent

# File lib/concurrent/edge/future.rb, line 260
def chain(executor = nil, &callback)
  ChainPromise.new(self, @DefaultExecutor, executor || @DefaultExecutor, &callback).future
end
Also aliased as: then
chain_completable(completable_event) click to toggle source
# File lib/concurrent/edge/future.rb, line 266
def chain_completable(completable_event)
  on_completion! { completable_event.complete_with COMPLETED }
end
Also aliased as: tangle
complete?(state = internal_state)
Alias for: completed?
complete_with(state, raise_on_reassign = true) click to toggle source

@!visibility private

# File lib/concurrent/edge/future.rb, line 338
def complete_with(state, raise_on_reassign = true)
  if compare_and_set_internal_state(PENDING, state)
    # go to synchronized block only if there were waiting threads
    @Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0
    call_callbacks
  else
    Concurrent::MultipleAssignmentError.new('Event can be completed only once') if raise_on_reassign
    return false
  end
  self
end
completed?(state = internal_state) click to toggle source

Has the Event been completed? @return [Boolean]

# File lib/concurrent/edge/future.rb, line 228
def completed?(state = internal_state)
  state.completed?
end
Also aliased as: complete?
default_executor() click to toggle source

@return [Executor] current default executor @see with_default_executor

# File lib/concurrent/edge/future.rb, line 255
def default_executor
  @DefaultExecutor
end
delay() click to toggle source

Inserts delay into the chain of Futures making rest of it lazy evaluated. @return [Event]

# File lib/concurrent/edge/future.rb, line 286
def delay
  ZipEventEventPromise.new(self, DelayPromise.new(@DefaultExecutor).event, @DefaultExecutor).event
end
incomplete?(state = internal_state)
Alias for: pending?
inspect() click to toggle source
# File lib/concurrent/edge/future.rb, line 328
def inspect
  "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>"
end
on_completion(executor = nil, &callback) click to toggle source

@yield [success, value, reason] executed async on `executor` when completed @return self

# File lib/concurrent/edge/future.rb, line 308
def on_completion(executor = nil, &callback)
  add_callback :async_callback_on_completion, executor || @DefaultExecutor, callback
end
on_completion!(&callback) click to toggle source

@yield [success, value, reason] executed sync when completed @return self

# File lib/concurrent/edge/future.rb, line 314
def on_completion!(&callback)
  add_callback :callback_on_completion, callback
end
pending?(state = internal_state) click to toggle source

Is Event/Future pending? @return [Boolean]

# File lib/concurrent/edge/future.rb, line 216
def pending?(state = internal_state)
  !state.completed?
end
Also aliased as: incomplete?
promise() click to toggle source

@!visibility private only for inspection

# File lib/concurrent/edge/future.rb, line 378
def promise
  @Promise
end
set(*args, &block) click to toggle source
# File lib/concurrent/edge/future.rb, line 332
def set(*args, &block)
  raise 'Use CompletableEvent#complete or CompletableFuture#complete instead, ' +
            'constructed by Concurrent.event or Concurrent.future respectively.'
end
state() click to toggle source

@return [:pending, :completed]

# File lib/concurrent/edge/future.rb, line 210
def state
  internal_state.to_sym
end
tangle(completable_event)
Alias for: chain_completable
then(executor = nil, &callback)
Alias for: chain
then_select(*channels) click to toggle source

Zips with selected value form the suplied channels @return [Future]

# File lib/concurrent/edge/future.rb, line 302
def then_select(*channels)
  ZipFutureEventPromise(Concurrent.select(*channels), self, @DefaultExecutor).future
end
to_s() click to toggle source
# File lib/concurrent/edge/future.rb, line 324
def to_s
  "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state.to_sym}>"
end
touch() click to toggle source

@!visibility private

# File lib/concurrent/edge/future.rb, line 247
def touch
  # distribute touch to promise only once
  @Promise.touch if @Touched.make_true
  self
end
touched() click to toggle source

@!visibility private only for inspection

# File lib/concurrent/edge/future.rb, line 384
def touched
  @Touched.value
end
unscheduled?() click to toggle source
# File lib/concurrent/edge/future.rb, line 220
def unscheduled?
  raise 'unsupported'
end
wait(timeout = nil) click to toggle source

Wait until Event is complete? @param [Numeric] timeout the maximum time in second to wait. @return [Event, true, false] self or true/false if timeout is used @!macro [attach] edge.periodical_wait

@note a thread should wait only once! For repeated checking use faster `completed?` check.
  If thread waits periodically it will dangerously grow the waiters stack.
# File lib/concurrent/edge/future.rb, line 240
def wait(timeout = nil)
  touch
  result = wait_until_complete(timeout)
  timeout ? result : self
end
waiting_threads() click to toggle source

@!visibility private only for debugging inspection

# File lib/concurrent/edge/future.rb, line 390
def waiting_threads
  @Waiters.each.to_a
end
with_default_executor(executor) click to toggle source

Changes default executor for rest of the chain @return [Event]

# File lib/concurrent/edge/future.rb, line 320
def with_default_executor(executor)
  EventWrapperPromise.new(self, executor).future
end
zip(other) click to toggle source

Zip with future producing new Future @return [Event]

# File lib/concurrent/edge/future.rb, line 274
def zip(other)
  if other.is?(Future)
    ZipFutureEventPromise.new(other, self, @DefaultExecutor).future
  else
    ZipEventEventPromise.new(self, other, @DefaultExecutor).future
  end
end
Also aliased as: &

Private Instance Methods

async_callback_on_completion(executor, callback) click to toggle source
# File lib/concurrent/edge/future.rb, line 418
def async_callback_on_completion(executor, callback)
  with_async(executor) { callback_on_completion callback }
end
call_callback(method, *args) click to toggle source
# File lib/concurrent/edge/future.rb, line 430
def call_callback(method, *args)
  self.send method, *args
end
call_callbacks() click to toggle source
# File lib/concurrent/edge/future.rb, line 434
def call_callbacks
  method, *args = @Callbacks.pop
  while method
    call_callback method, *args
    method, *args = @Callbacks.pop
  end
end
callback_notify_blocked(promise) click to toggle source
# File lib/concurrent/edge/future.rb, line 426
def callback_notify_blocked(promise)
  promise.on_done self
end
callback_on_completion(callback) click to toggle source
# File lib/concurrent/edge/future.rb, line 422
def callback_on_completion(callback)
  callback.call
end
wait_until_complete(timeout) click to toggle source

@return [true, false]

# File lib/concurrent/edge/future.rb, line 397
def wait_until_complete(timeout)
  return true if completed?

  @Lock.synchronize do
    @Waiters.increment
    begin
      unless completed?
        @Condition.wait @Lock, timeout
      end
    ensure
      # JRuby may raise ConcurrencyError
      @Waiters.decrement
    end
  end
  completed?
end
with_async(executor, *args, &block) click to toggle source
# File lib/concurrent/edge/future.rb, line 414
def with_async(executor, *args, &block)
  Concurrent.post_on(executor, *args, &block)
end