class Concurrent::Channel::Buffer::Base
Abstract base class for all Channel buffers.
{Concurrent::Channel} objects maintain an internal, queue-like object called a buffer. It's the storage bin for values put onto or taken from the channel. Different buffer types have different characteristics. Subsequently, the behavior of any given channel is highly dependent uping the type of its buffer. This is the base class which defines the common buffer interface. Any class intended to be used as a channel buffer should extend this class.
Attributes
@!macro [attach] channel_buffer_capacity_reader
The maximum number of values which can be {#put} onto the buffer it becomes full.
Public Class Methods
@!macro [attach] channel_buffer_initialize
Creates a new buffer.
# File lib/concurrent/channel/buffer/base.rb, line 27 def initialize(*args) super() synchronize do @closed = false @size = 0 @capacity = 0 @buffer = nil ns_initialize(*args) end end
Public Instance Methods
@!macro [attach] channel_buffer_blocking_question
Predicate indicating if this buffer will block {#put} operations once it reaches its maximum capacity. @return [Boolean] true if this buffer blocks else false
# File lib/concurrent/channel/buffer/base.rb, line 44 def blocking? true end
@!macro [attach] channel_buffer_close
Close the buffer, preventing new items from being added. Once a buffer is closed it cannot be opened again. @return [Boolean] true if the buffer was open and successfully closed else false.
# File lib/concurrent/channel/buffer/base.rb, line 176 def close synchronize do @closed ? false : @closed = true end end
@!macro [attach] channel_buffer_closed_question
Predicate indicating is this buffer closed. @return [Boolea] true when closed else false.
# File lib/concurrent/channel/buffer/base.rb, line 187 def closed? synchronize { ns_closed? } end
@!macro [attach] channel_buffer_empty_question
Predicate indicating if the buffer is empty. @return [Boolean] true if this buffer is empty else false
@raise [NotImplementedError] until overridden in a subclass.
# File lib/concurrent/channel/buffer/base.rb, line 62 def empty? synchronize { ns_empty? } end
@!macro [attach] channel_buffer_full_question
Predicate indicating if the buffer is full. @return [Boolean] true if this buffer is full else false
@raise [NotImplementedError] until overridden in a subclass.
# File lib/concurrent/channel/buffer/base.rb, line 73 def full? synchronize { ns_full? } end
@!macro [attach] channel_buffer_next
Take the next "item" from the buffer and also return a boolean indicating if "more" items can be taken. Used for iterating over a buffer until it is closed and empty. If the buffer is open but no items remain the calling thread will block until an item is available. The second of the two return values, "more" (a boolean), will always be `true` when the buffer is open. The "more" value will be `false` when the channel has been closed and all values have already been received. When "more" is false the returned item will be `Concurrent::NULL`. Note that when multiple threads access the same channel a race condition can occur when using this method. A call to `next` from one thread may return `true` for the second return value, but another thread may `take` the last value before the original thread makes another call. Code which iterates over a channel must be programmed to properly handle these race conditions. @return [Object, Boolean] the first return value will be the item taken from the buffer and the second return value will be a boolean indicating whether or not more items remain.
@raise [NotImplementedError] until overridden in a subclass.
# File lib/concurrent/channel/buffer/base.rb, line 151 def next raise NotImplementedError end
@!macro [attach] channel_buffer_offer
Put an item onto the buffer is possible. If the buffer is open but unable to add an item, probably due to being full, the method will return immediately. Similarly, the method will return immediately when the buffer is closed. A return value of `false` does not necessarily indicate that the buffer is closed, just that the item could not be added. @param [Object] item the item/value to put onto the buffer. @return [Boolean] true if the item was added to the buffer else false (always false when closed).
@raise [NotImplementedError] until overridden in a subclass.
# File lib/concurrent/channel/buffer/base.rb, line 106 def offer(item) raise NotImplementedError end
@!macro [attach] channel_buffer_poll
Take the next item from the buffer if one is available else return immediately. Failing to return a value does not necessarily indicate that the buffer is closed, just that it is empty. @return [Object] the next item from the buffer or `Concurrent::NULL` if the buffer is empty.
@raise [NotImplementedError] until overridden in a subclass.
# File lib/concurrent/channel/buffer/base.rb, line 165 def poll raise NotImplementedError end
@!macro [attach] channel_buffer_put
Put an item onto the buffer if possible. If the buffer is open but not able to accept the item the calling thread will block until the item can be put onto the buffer. @param [Object] item the item/value to put onto the buffer. @return [Boolean] true if the item was added to the buffer else false (always false when closed).
@raise [NotImplementedError] until overridden in a subclass.
# File lib/concurrent/channel/buffer/base.rb, line 88 def put(item) raise NotImplementedError end
@!macro [attach] channel_buffer_size_reader
The number of items currently in the buffer.
# File lib/concurrent/channel/buffer/base.rb, line 51 def size synchronize { ns_size } end
@!macro [attach] channel_buffer_take
Take an item from the buffer if one is available. If the buffer is open and no item is available the calling thread will block until an item is available. If the buffer is closed but items are available the remaining items can still be taken. Once the buffer closes, no remaining items can be taken. @return [Object] the item removed from the buffer; `Concurrent::NULL` once the buffer has closed.
@raise [NotImplementedError] until overridden in a subclass.
# File lib/concurrent/channel/buffer/base.rb, line 122 def take raise NotImplementedError end
Private Instance Methods
# File lib/concurrent/channel/buffer/base.rb, line 193 def buffer @buffer end
# File lib/concurrent/channel/buffer/base.rb, line 197 def buffer=(value) @buffer = value end
# File lib/concurrent/channel/buffer/base.rb, line 205 def capacity=(value) @capacity = value end
# File lib/concurrent/channel/buffer/base.rb, line 201 def closed=(value) @closed = value end
@!macro channel_buffer_closed_question
# File lib/concurrent/channel/buffer/base.rb, line 232 def ns_closed? @closed end
@!macro channel_buffer_empty_question
# File lib/concurrent/channel/buffer/base.rb, line 222 def ns_empty? raise NotImplementedError end
@!macro channel_buffer_full_question
# File lib/concurrent/channel/buffer/base.rb, line 227 def ns_full? raise NotImplementedError end
# File lib/concurrent/channel/buffer/base.rb, line 213 def ns_initialize(*args) end
@!macro channel_buffer_size_reader
# File lib/concurrent/channel/buffer/base.rb, line 217 def ns_size raise NotImplementedError end
# File lib/concurrent/channel/buffer/base.rb, line 209 def size=(value) @size = value end