class Concurrent::Edge::Event
Represents an event which will happen in future (will be completed). It has to always happen.
Constants
- COMPLETED
@!visibility private
- PENDING
@!visibility private
Public Class Methods
# File lib/concurrent/edge/future.rb, line 197 def initialize(promise, default_executor) super() @Lock = Mutex.new @Condition = ConditionVariable.new @Promise = promise @DefaultExecutor = default_executor @Touched = AtomicBoolean.new false @Callbacks = LockFreeStack.new @Waiters = AtomicFixnum.new 0 self.internal_state = PENDING end
Public Instance Methods
@!visibility private
# File lib/concurrent/edge/future.rb, line 366 def add_callback(method, *args) if completed? call_callback method, *args else @Callbacks.push [method, *args] call_callbacks if completed? end self end
@!visibility private just for inspection @return [Array<AbstractPromise>]
# File lib/concurrent/edge/future.rb, line 353 def blocks @Callbacks.each_with_object([]) do |callback, promises| promises.push(*(callback.select { |v| v.is_a? AbstractPromise })) end end
@!visibility private just for inspection
# File lib/concurrent/edge/future.rb, line 361 def callbacks @Callbacks.each.to_a end
@yield [success, value, reason] of the parent
# File lib/concurrent/edge/future.rb, line 260 def chain(executor = nil, &callback) ChainPromise.new(self, @DefaultExecutor, executor || @DefaultExecutor, &callback).future end
# File lib/concurrent/edge/future.rb, line 266 def chain_completable(completable_event) on_completion! { completable_event.complete_with COMPLETED } end
@!visibility private
# File lib/concurrent/edge/future.rb, line 338 def complete_with(state, raise_on_reassign = true) if compare_and_set_internal_state(PENDING, state) # go to synchronized block only if there were waiting threads @Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0 call_callbacks else Concurrent::MultipleAssignmentError.new('Event can be completed only once') if raise_on_reassign return false end self end
Has the Event been completed? @return [Boolean]
# File lib/concurrent/edge/future.rb, line 228 def completed?(state = internal_state) state.completed? end
@return [Executor] current default executor @see with_default_executor
# File lib/concurrent/edge/future.rb, line 255 def default_executor @DefaultExecutor end
Inserts delay into the chain of Futures making rest of it lazy evaluated. @return [Event]
# File lib/concurrent/edge/future.rb, line 286 def delay ZipEventEventPromise.new(self, DelayPromise.new(@DefaultExecutor).event, @DefaultExecutor).event end
# File lib/concurrent/edge/future.rb, line 328 def inspect "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>" end
@yield [success, value, reason] executed async on `executor` when completed @return self
# File lib/concurrent/edge/future.rb, line 308 def on_completion(executor = nil, &callback) add_callback :async_callback_on_completion, executor || @DefaultExecutor, callback end
@yield [success, value, reason] executed sync when completed @return self
# File lib/concurrent/edge/future.rb, line 314 def on_completion!(&callback) add_callback :callback_on_completion, callback end
Is Event/Future pending? @return [Boolean]
# File lib/concurrent/edge/future.rb, line 216 def pending?(state = internal_state) !state.completed? end
@!visibility private only for inspection
# File lib/concurrent/edge/future.rb, line 378 def promise @Promise end
# File lib/concurrent/edge/future.rb, line 332 def set(*args, &block) raise 'Use CompletableEvent#complete or CompletableFuture#complete instead, ' + 'constructed by Concurrent.event or Concurrent.future respectively.' end
@return [:pending, :completed]
# File lib/concurrent/edge/future.rb, line 210 def state internal_state.to_sym end
Zips with selected value form the suplied channels @return [Future]
# File lib/concurrent/edge/future.rb, line 302 def then_select(*channels) ZipFutureEventPromise(Concurrent.select(*channels), self, @DefaultExecutor).future end
# File lib/concurrent/edge/future.rb, line 324 def to_s "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state.to_sym}>" end
@!visibility private
# File lib/concurrent/edge/future.rb, line 247 def touch # distribute touch to promise only once @Promise.touch if @Touched.make_true self end
@!visibility private only for inspection
# File lib/concurrent/edge/future.rb, line 384 def touched @Touched.value end
# File lib/concurrent/edge/future.rb, line 220 def unscheduled? raise 'unsupported' end
Wait until Event is complete? @param [Numeric] timeout the maximum time in second to wait. @return [Event, true, false] self or true/false if timeout is used @!macro [attach] edge.periodical_wait
@note a thread should wait only once! For repeated checking use faster `completed?` check. If thread waits periodically it will dangerously grow the waiters stack.
# File lib/concurrent/edge/future.rb, line 240 def wait(timeout = nil) touch result = wait_until_complete(timeout) timeout ? result : self end
@!visibility private only for debugging inspection
# File lib/concurrent/edge/future.rb, line 390 def waiting_threads @Waiters.each.to_a end
Changes default executor for rest of the chain @return [Event]
# File lib/concurrent/edge/future.rb, line 320 def with_default_executor(executor) EventWrapperPromise.new(self, executor).future end
Zip with future producing new Future @return [Event]
# File lib/concurrent/edge/future.rb, line 274 def zip(other) if other.is?(Future) ZipFutureEventPromise.new(other, self, @DefaultExecutor).future else ZipEventEventPromise.new(self, other, @DefaultExecutor).future end end
Private Instance Methods
# File lib/concurrent/edge/future.rb, line 418 def async_callback_on_completion(executor, callback) with_async(executor) { callback_on_completion callback } end
# File lib/concurrent/edge/future.rb, line 430 def call_callback(method, *args) self.send method, *args end
# File lib/concurrent/edge/future.rb, line 434 def call_callbacks method, *args = @Callbacks.pop while method call_callback method, *args method, *args = @Callbacks.pop end end
# File lib/concurrent/edge/future.rb, line 426 def callback_notify_blocked(promise) promise.on_done self end
# File lib/concurrent/edge/future.rb, line 422 def callback_on_completion(callback) callback.call end
@return [true, false]
# File lib/concurrent/edge/future.rb, line 397 def wait_until_complete(timeout) return true if completed? @Lock.synchronize do @Waiters.increment begin unless completed? @Condition.wait @Lock, timeout end ensure # JRuby may raise ConcurrencyError @Waiters.decrement end end completed? end
# File lib/concurrent/edge/future.rb, line 414 def with_async(executor, *args, &block) Concurrent.post_on(executor, *args, &block) end