class Concurrent::Edge::Future
Represents a value which will become available in future. May fail with a reason instead.
Public Instance Methods
@!visibility private
# File lib/concurrent/edge/future.rb, line 767 def add_callback(method, *args) state = internal_state if completed?(state) call_callback method, state, *args else @Callbacks.push [method, *args] state = internal_state # take back if it was completed in the meanwhile call_callbacks state if completed?(state) end self end
@return [Future] which has first completed value from futures
# File lib/concurrent/edge/future.rb, line 672 def any(*futures) AnyCompletePromise.new([self, *futures], @DefaultExecutor).future end
@!visibility private
# File lib/concurrent/edge/future.rb, line 781 def apply(block) internal_state.apply block end
# File lib/concurrent/edge/future.rb, line 652 def chain_completable(completable_future) on_completion! { completable_future.complete_with internal_state } end
@!visibility private
# File lib/concurrent/edge/future.rb, line 749 def complete_with(state, raise_on_reassign = true) if compare_and_set_internal_state(PENDING, state) # go to synchronized block only if there were waiting threads @Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0 call_callbacks state else if raise_on_reassign log ERROR, 'Edge::Future', reason if reason # print otherwise hidden error raise(Concurrent::MultipleAssignmentError.new( "Future can be completed only once. Current result is #{result}, " + "trying to set #{state.result}")) end return false end self end
Inserts delay into the chain of Futures making rest of it lazy evaluated. @return [Future]
# File lib/concurrent/edge/future.rb, line 678 def delay ZipFutureEventPromise.new(self, DelayPromise.new(@DefaultExecutor).future, @DefaultExecutor).future end
@example allows failed Future to be risen
raise Concurrent.future.fail
# File lib/concurrent/edge/future.rb, line 629 def exception(*args) raise 'obligation is not failed' unless failed? reason = internal_state.reason if reason.is_a?(::Array) reason.each { |e| log ERROR, 'Edge::Future', e } Concurrent::Error.new 'multiple exceptions, inspect log' else reason.exception(*args) end end
Has Future been failed? @return [Boolean]
# File lib/concurrent/edge/future.rb, line 570 def failed?(state = internal_state) state.completed? && !state.success? end
zips with the Future in the value @example
Concurrent.future { Concurrent.future { 1 } }.flat.value # => 1
# File lib/concurrent/edge/future.rb, line 667 def flat(level = 1) FlatPromise.new(self, level, @DefaultExecutor).future end
# File lib/concurrent/edge/future.rb, line 563 def fulfilled? deprecated_method 'fulfilled?', 'success?' success? end
@yield [reason] executed async on `executor` when failed? @return self
# File lib/concurrent/edge/future.rb, line 732 def on_failure(executor = nil, &callback) add_callback :async_callback_on_failure, executor || @DefaultExecutor, callback end
@yield [reason] executed sync when failed? @return self
# File lib/concurrent/edge/future.rb, line 744 def on_failure!(&callback) add_callback :callback_on_failure, callback end
@yield [value] executed async on `executor` when success @return self
# File lib/concurrent/edge/future.rb, line 726 def on_success(executor = nil, &callback) add_callback :async_callback_on_success, executor || @DefaultExecutor, callback end
@yield [value] executed sync when success @return self
# File lib/concurrent/edge/future.rb, line 738 def on_success!(&callback) add_callback :callback_on_success, callback end
@return [Exception, nil] the reason of the Future's failure @!macro edge.timeout_nil @!macro edge.periodical_wait
# File lib/concurrent/edge/future.rb, line 592 def reason(timeout = nil) touch internal_state.reason if wait_until_complete timeout end
# File lib/concurrent/edge/future.rb, line 574 def rejected? deprecated_method 'rejected?', 'failed?' failed? end
@yield [reason] executed only on parent failure @return [Future]
# File lib/concurrent/edge/future.rb, line 660 def rescue(executor = nil, &callback) RescuePromise.new(self, @DefaultExecutor, executor || @DefaultExecutor, &callback).future end
@return [Array(Boolean, Object, Exception), nil] triplet of success, value, reason @!macro edge.timeout_nil @!macro edge.periodical_wait
# File lib/concurrent/edge/future.rb, line 600 def result(timeout = nil) touch internal_state.result if wait_until_complete timeout end
Schedules rest of the chain for execution with specified time or on specified time @return [Future]
# File lib/concurrent/edge/future.rb, line 684 def schedule(intended_time) chain do ZipFutureEventPromise.new(self, ScheduledPromise.new(@DefaultExecutor, intended_time).event, @DefaultExecutor).future end.flat end
Has Future been success? @return [Boolean]
# File lib/concurrent/edge/future.rb, line 559 def success?(state = internal_state) state.completed? && state.success? end
@yield [value] executed only on parent success @return [Future]
# File lib/concurrent/edge/future.rb, line 642 def then(executor = nil, &callback) ThenPromise.new(self, @DefaultExecutor, executor || @DefaultExecutor, &callback).future end
Asks the actor with its value. @return [Future] new future with the response form the actor
# File lib/concurrent/edge/future.rb, line 648 def then_ask(actor) self.then { |v| actor.ask(v) }.flat end
@note may block @note only proof of concept
# File lib/concurrent/edge/future.rb, line 720 def then_put(channel) on_success(:io) { |value| channel.put value } end
Zips with selected value form the suplied channels @return [Future]
# File lib/concurrent/edge/future.rb, line 694 def then_select(*channels) ZipFuturesPromise.new([self, Concurrent.select(*channels)], @DefaultExecutor).future end
@return [Object, nil] the value of the Future when success, nil on timeout @!macro [attach] edge.timeout_nil
@note If the Future can have value `nil` then it cannot be distinquished from `nil` returned on timeout. In this case is better to use first `wait` then `value` (or similar).
@!macro edge.periodical_wait
# File lib/concurrent/edge/future.rb, line 584 def value(timeout = nil) touch internal_state.value if wait_until_complete timeout end
Wait until Future is complete? @param [Numeric] timeout the maximum time in second to wait. @raise reason on failure @return [Object, nil] @!macro edge.timeout_nil @!macro edge.periodical_wait
# File lib/concurrent/edge/future.rb, line 622 def value!(timeout = nil) touch internal_state.value if wait_until_complete! timeout end
Wait until Future is complete? @param [Numeric] timeout the maximum time in second to wait. @raise reason on failure @return [Event, true, false] self or true/false if timeout is used @!macro edge.periodical_wait
# File lib/concurrent/edge/future.rb, line 610 def wait!(timeout = nil) touch result = wait_until_complete!(timeout) timeout ? result : self end
Changes default executor for rest of the chain @return [Future]
# File lib/concurrent/edge/future.rb, line 700 def with_default_executor(executor) FutureWrapperPromise.new(self, executor).future end
Zip with future producing new Future @return [Future]
# File lib/concurrent/edge/future.rb, line 706 def zip(other) if other.is_a?(Future) ZipFutureFuturePromise.new(self, other, @DefaultExecutor).future else ZipFutureEventPromise.new(self, other, @DefaultExecutor).future end end
Private Instance Methods
# File lib/concurrent/edge/future.rb, line 833 def async_callback_on_completion(state, executor, callback) with_async(executor, state, callback) do |st, cb| callback_on_completion st, cb end end
# File lib/concurrent/edge/future.rb, line 811 def async_callback_on_failure(state, executor, callback) with_async(executor, state, callback) do |st, cb| callback_on_failure st, cb end end
# File lib/concurrent/edge/future.rb, line 805 def async_callback_on_success(state, executor, callback) with_async(executor, state, callback) do |st, cb| callback_on_success st, cb end end
# File lib/concurrent/edge/future.rb, line 801 def call_callback(method, state, *args) self.send method, state, *args end
# File lib/concurrent/edge/future.rb, line 793 def call_callbacks(state) method, *args = @Callbacks.pop while method call_callback method, state, *args method, *args = @Callbacks.pop end end
# File lib/concurrent/edge/future.rb, line 829 def callback_notify_blocked(state, promise) super(promise) end
# File lib/concurrent/edge/future.rb, line 825 def callback_on_completion(state, callback) callback.call state.result end
# File lib/concurrent/edge/future.rb, line 821 def callback_on_failure(state, callback) state.apply callback unless state.success? end
# File lib/concurrent/edge/future.rb, line 817 def callback_on_success(state, callback) state.apply callback if state.success? end
# File lib/concurrent/edge/future.rb, line 787 def wait_until_complete!(timeout = nil) result = wait_until_complete(timeout) raise self if failed? result end