class Concurrent::Channel::Buffer::Buffered
A buffer with a fixed internal capacity. Items can be put onto the buffer without blocking until the internal capacity is reached. Once the buffer is at capacity, subsequent calls to {#put} will block until an item is removed from the buffer, creating spare capacity.
Public Instance Methods
@!macro channel_buffer_next
# File lib/concurrent/channel/buffer/buffered.rb, line 56 def next loop do synchronize do if ns_closed? && ns_empty? return Concurrent::NULL, false elsif !ns_empty? item = buffer.shift return item, true end end Thread.pass end end
@!macro channel_buffer_offer
New items can be put onto the buffer until the number of items in the buffer reaches the {#size} value specified during initialization.
# File lib/concurrent/channel/buffer/buffered.rb, line 38 def offer(item) synchronize do if ns_closed? || ns_full? return false else ns_put_onto_buffer(item) return true end end end
@!macro channel_buffer_poll
# File lib/concurrent/channel/buffer/buffered.rb, line 71 def poll synchronize do if ns_empty? Concurrent::NULL else buffer.shift end end end
@!macro channel_buffer_put
New items can be put onto the buffer until the number of items in the buffer reaches the {#size} value specified during initialization.
# File lib/concurrent/channel/buffer/buffered.rb, line 19 def put(item) loop do synchronize do if ns_closed? return false elsif !ns_full? ns_put_onto_buffer(item) return true end end Thread.pass end end
@!macro channel_buffer_take
# File lib/concurrent/channel/buffer/buffered.rb, line 50 def take item, _ = self.next item end
Private Instance Methods
@!macro channel_buffer_empty_question
# File lib/concurrent/channel/buffer/buffered.rb, line 100 def ns_empty? ns_size == 0 end
@!macro channel_buffer_full_question
# File lib/concurrent/channel/buffer/buffered.rb, line 105 def ns_full? ns_size == capacity end
@!macro channel_buffer_initialize
@param [Integer] size the maximum capacity of the buffer; must be
greater than zero.
@raise [ArgumentError] when the size is zero (0) or less.
# File lib/concurrent/channel/buffer/buffered.rb, line 88 def ns_initialize(size) raise ArgumentError.new('size must be greater than 0') if size.to_i <= 0 self.capacity = size.to_i self.buffer = [] end
@!macro channel_buffer_put
# File lib/concurrent/channel/buffer/buffered.rb, line 110 def ns_put_onto_buffer(item) buffer.push(item) end
@!macro channel_buffer_size_reader
# File lib/concurrent/channel/buffer/buffered.rb, line 95 def ns_size buffer.size end