class Concurrent::Promises::Channel

A first in first out channel that accepts messages with push family of methods and returns messages with pop family of methods. Pop and push operations can be represented as futures, see {#pop_op} and {#push_op}. The capacity of the channel can be limited to support back pressure, use capacity option in {#initialize}. {#pop} method blocks ans {#pop_op} returns pending future if there is no message in the channel. If the capacity is limited the {#push} method blocks and {#push_op} returns pending future.

{include:file:docs-source/channel.out.md} @!macro warn.edge

Constants

ANY

An object which matches anything (with ===)

NOTHING
UNLIMITED_CAPACITY

Default capacity of the Channel, makes it accept unlimited number of messages.

Public Class Methods

new(capacity = UNLIMITED_CAPACITY) click to toggle source

Create channel. @param [Integer, UNLIMITED_CAPACITY] capacity the maximum number of messages which can be stored in the channel.

Calls superclass method
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 61
def initialize(capacity = UNLIMITED_CAPACITY)
  super()
  @Capacity = capacity
  @Mutex    = Mutex.new
  # TODO (pitr-ch 28-Jan-2019): consider linked lists or other data structures for following attributes, things are being deleted from the middle
  @Probes      = []
  @Messages    = []
  @PendingPush = []
end
select(channels, timeout = nil) click to toggle source

@see select @return [::Array(Channel, Object), nil]

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 319
def select(channels, timeout = nil)
  channels.first.select(channels[1..-1], timeout)
end
select_matching(matcher, channels, timeout = nil) click to toggle source

@see select_matching @return [::Array(Channel, Object), nil]

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 337
def select_matching(matcher, channels, timeout = nil)
  channels.first.select_matching(matcher, channels[1..-1], timeout)
end
select_op(channels, probe = Promises.resolvable_future) click to toggle source

@see select_op @return [Future(::Array(Channel, Object))]

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 313
def select_op(channels, probe = Promises.resolvable_future)
  channels.first.select_op(channels[1..-1], probe)
end
select_op_matching(matcher, channels, probe = Promises.resolvable_future) click to toggle source

@see select_op_matching @return [Future(::Array(Channel, Object))]

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 331
def select_op_matching(matcher, channels, probe = Promises.resolvable_future)
  channels.first.select_op_matching(matcher, channels[1..-1], probe)
end
try_select(channels) click to toggle source

@see try_select @return [::Array(Channel, Object)]

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 307
def try_select(channels)
  channels.first.try_select(channels[1..-1])
end
try_select_matching(matcher, channels) click to toggle source

@see try_select_matching @return [::Array(Channel, Object)]

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 325
def try_select_matching(matcher, channels)
  channels.first.try_select_matching(matcher, channels[1..-1])
end

Public Instance Methods

<=>(other) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 36
def <=>(other)
  1
end
capacity() click to toggle source

@return [Integer] Maximum capacity of the Channel.

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 292
def capacity
  @Capacity
end
inspect()
Alias for: to_s
partial_select_op(matcher, probe) click to toggle source

@!visibility private

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 343
def partial_select_op(matcher, probe)
  @Mutex.synchronize { ns_pop_op(matcher, probe, true) }
end
peek(no_value = nil) click to toggle source

@!macro promises.channel.peek

Behaves as {#try_pop} but it does not remove the message from the channel
@param [Object] no_value returned when there is no message available
@return [Object, no_value] message or nil when there is no message
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 206
def peek(no_value = nil)
  peek_matching ANY, no_value
end
peek_matching(matcher, no_value = nil) click to toggle source

@!macro promises.channel.peek @!macro promises.channel.param.matcher

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 212
def peek_matching(matcher, no_value = nil)
  @Mutex.synchronize do
    message = ns_shift_message matcher, false
    return message if message != NOTHING
    message = ns_consume_pending_push matcher, false
    return message != NOTHING ? message : no_value
  end
end
pop(timeout = nil, timeout_value = nil) click to toggle source

@!macro promises.channel.pop

Blocks current thread until a message is available in the channel for popping.

@!macro channel.warn.blocks
@!macro channel.param.timeout
@param [Object] timeout_value a value returned by the method when it times out
@return [Object, nil] message or nil when timed out
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 174
def pop(timeout = nil, timeout_value = nil)
  pop_matching ANY, timeout, timeout_value
end
pop_matching(matcher, timeout = nil, timeout_value = nil) click to toggle source

@!macro promises.channel.pop @!macro promises.channel.param.matcher

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 180
def pop_matching(matcher, timeout = nil, timeout_value = nil)
  # TODO (pitr-ch 27-Jan-2019): should it try to match pending pushes if it fails to match in the buffer? Maybe only if the size is zero. It could be surprising if it's used as a throttle it might be expected that it will not pop if buffer is full of messages which di not match, it might it expected it will block until the message is added to the buffer
  # that it returns even if the buffer is full. User might expect that it has to be in the buffer first.
  probe = @Mutex.synchronize do
    message = ns_shift_message matcher
    if message == NOTHING
      message = ns_consume_pending_push matcher
      return message if message != NOTHING
    else
      new_message = ns_consume_pending_push ANY
      @Messages.push new_message unless new_message == NOTHING
      return message
    end

    probe = Promises.resolvable_future
    @Probes.push probe, false, matcher
    probe
  end

  probe.value!(timeout, timeout_value, [true, timeout_value, nil])
end
pop_op(probe = Promises.resolvable_future) click to toggle source

@!macro promises.channel.pop_op

Returns a future witch will become fulfilled with a value from the channel when one is available.
@!macro chanel.operation_wait

@param [ResolvableFuture] probe the future which will be fulfilled with a channel value
@return [Future(Object)] the probe, its value will be the message when available.
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 157
def pop_op(probe = Promises.resolvable_future)
  @Mutex.synchronize { ns_pop_op(ANY, probe, false) }
end
pop_op_matching(matcher, probe = Promises.resolvable_future) click to toggle source

@!macro promises.channel.pop_op @!macro promises.channel.param.matcher

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 163
def pop_op_matching(matcher, probe = Promises.resolvable_future)
  @Mutex.synchronize { ns_pop_op(matcher, probe, false) }
end
push(message, timeout = nil) click to toggle source

Blocks current thread until the message is pushed into the channel.

@!macro channel.warn.blocks @param [Object] message @!macro channel.param.timeout @return [self, true, false] self implies timeout was not used, true implies timeout was used

and it was pushed, false implies it was not pushed within timeout.
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 117
def push(message, timeout = nil)
  pushed_op = @Mutex.synchronize do
    return timeout ? true : self if ns_try_push(message)

    pushed = Promises.resolvable_future
    # TODO (pitr-ch 06-Jan-2019): clear timed out pushes in @PendingPush, null messages
    @PendingPush.push message, pushed
    pushed
  end

  result = pushed_op.wait!(timeout, [true, self, nil])
  result == pushed_op ? self : result
end
push_op(message) click to toggle source

Returns future which will fulfill when the message is pushed to the channel. @!macro chanel.operation_wait

If it is later waited on the operation with a timeout e.g.`channel.pop_op.wait(1)`
it will not prevent the channel to fulfill the operation later after the timeout.
The operation has to be either processed later
```ruby
pop_op = channel.pop_op
if pop_op.wait(1)
  process_message pop_op.value
else
  pop_op.then { |message| log_unprocessed_message message }
end
```
or the operation can be prevented from completion after timing out by using
`channel.pop_op.wait(1, [true, nil, nil])`.
It will fulfill the operation on timeout preventing channel from doing the operation,
e.g. popping a message.

@param [Object] message @return [ResolvableFuture(self)]

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 98
def push_op(message)
  @Mutex.synchronize do
    if ns_try_push(message)
      Promises.fulfilled_future self
    else
      pushed = Promises.resolvable_future
      @PendingPush.push message, pushed
      return pushed
    end
  end
end
select(channels, timeout = nil) click to toggle source

@!macro promises.channel.select

As {#select_op} but does not return future,
it block current thread instead until there is a message available
in the receiver or in any of the channels.

@!macro channel.warn.blocks
@param [Channel, ::Array<Channel>] channels
@!macro channel.param.timeout
@return [::Array(Channel, Object), nil] message or nil when timed out
@see #select_op
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 275
def select(channels, timeout = nil)
  select_matching ANY, channels, timeout
end
select_matching(matcher, channels, timeout = nil) click to toggle source

@!macro promises.channel.select @!macro promises.channel.param.matcher

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 281
def select_matching(matcher, channels, timeout = nil)
  probe = select_op_matching(matcher, channels)
  probe.value!(timeout, nil, [true, nil, nil])
end
select_op(channels, probe = Promises.resolvable_future) click to toggle source

@!macro promises.channel.select_op

When message is available in the receiver or any of the provided channels
the future is fulfilled with a channel message pair.
The returned channel is the origin of the message.
@!macro chanel.operation_wait

@param [Channel, ::Array<Channel>] channels
@param [ResolvableFuture] probe the future which will be fulfilled with the message
@return [ResolvableFuture(::Array(Channel, Object))] a future which is fulfilled with
  pair [channel, message] when one of the channels is available for reading
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 254
def select_op(channels, probe = Promises.resolvable_future)
  select_op_matching ANY, channels, probe
end
select_op_matching(matcher, channels, probe = Promises.resolvable_future) click to toggle source

@!macro promises.channel.select_op @!macro promises.channel.param.matcher

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 260
def select_op_matching(matcher, channels, probe = Promises.resolvable_future)
  [self, *channels].each { |ch| ch.partial_select_op matcher, probe }
  probe
end
size() click to toggle source

@return [Integer] The number of messages currently stored in the channel.

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 287
def size
  @Mutex.synchronize { @Messages.size }
end
to_s() click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 40
def to_s
  'unlimited'
end
Also aliased as: inspect
try_pop(no_value = nil) click to toggle source

@!macro promises.channel.try_pop

Pop a message from the channel if there is one available.
@param [Object] no_value returned when there is no message available
@return [Object, no_value] message or nil when there is no message
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 135
def try_pop(no_value = nil)
  try_pop_matching ANY, no_value
end
try_pop_matching(matcher, no_value = nil) click to toggle source

@!macro promises.channel.try_pop @!macro promises.channel.param.matcher

@param [#===] matcher only consider message which matches `matcher === a_message`
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 142
def try_pop_matching(matcher, no_value = nil)
  @Mutex.synchronize do
    message = ns_shift_message matcher
    return message if message != NOTHING
    message = ns_consume_pending_push matcher
    return message != NOTHING ? message : no_value
  end
end
try_push(message) click to toggle source

Push the message into the channel if there is space available. @param [Object] message @return [true, false]

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 74
def try_push(message)
  @Mutex.synchronize { ns_try_push(message) }
end
try_select(channels) click to toggle source

@!macro promises.channel.try_select

If message is available in the receiver or any of the provided channels
the channel message pair is returned. If there is no message nil is returned.
The returned channel is the origin of the message.

@param [Channel, ::Array<Channel>] channels
@return [::Array(Channel, Object), nil]
  pair [channel, message] if one of the channels is available for reading
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 229
def try_select(channels)
  try_select_matching ANY, channels
end
try_select_matching(matcher, channels) click to toggle source

@!macro promises.channel.try_select @!macro promises.channel.param.matcher

# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 235
def try_select_matching(matcher, channels)
  message = nil
  channel = [self, *channels].find do |ch|
    message = ch.try_pop_matching(matcher, NOTHING)
    message != NOTHING
  end
  channel ? [channel, message] : nil
end

Private Instance Methods

ns_consume_pending_push(matcher, remove = true) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 394
def ns_consume_pending_push(matcher, remove = true)
  i = 0
  while true
    message, pushed = @PendingPush[i, 2]
    return NOTHING unless pushed

    if matcher === message
      resolved           = pushed.resolved?
      @PendingPush[i, 2] = [] if remove || resolved
      # can fail if timed-out, so try without error
      if remove ? pushed.fulfill(self, false) : !resolved
        # pushed fulfilled so actually push the message
        return message
      end
    end

    i += 2
  end
end
ns_pop_op(matcher, probe, include_channel) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 349
def ns_pop_op(matcher, probe, include_channel)
  message = ns_shift_message matcher

  # got message from buffer
  if message != NOTHING
    if probe.fulfill(include_channel ? [self, message] : message, false)
      new_message = ns_consume_pending_push ANY
      @Messages.push new_message unless new_message == NOTHING
    else
      @Messages.unshift message
    end
    return probe
  end

  # no message in buffer, try to pair with a pending push
  i = 0
  while true
    message, pushed = @PendingPush[i, 2]
    break if pushed.nil?

    if matcher === message
      value = include_channel ? [self, message] : message
      if Promises::Resolvable.atomic_resolution(probe  => [true, value, nil],
                                                pushed => [true, self, nil])
        @PendingPush[i, 2] = []
        return probe
      end

      if probe.resolved?
        return probe
      end

      # so pushed.resolved? has to be true, remove the push
      @PendingPush[i, 2] = []
    end

    i += 2
  end

  # no push to pair with
  # TODO (pitr-ch 11-Jan-2019): clear up probes when timed out, use callback
  @Probes.push probe, include_channel, matcher if probe.pending?
  return probe
end
ns_shift_message(matcher, remove = true) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 434
def ns_shift_message(matcher, remove = true)
  i = 0
  while true
    message = @Messages.fetch(i, NOTHING)
    return NOTHING if message == NOTHING

    if matcher === message
      @Messages.delete_at i if remove
      return message
    end

    i += 1
  end
end
ns_try_push(message) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/channel.rb, line 414
def ns_try_push(message)
  i = 0
  while true
    probe, include_channel, matcher = @Probes[i, 3]
    break unless probe
    if matcher === message && probe.fulfill(include_channel ? [self, message] : message, false)
      @Probes[i, 3] = []
      return true
    end
    i += 3
  end

  if @Capacity > @Messages.size
    @Messages.push message
    true
  else
    false
  end
end