class Concurrent::Channel::Buffer::Buffered

A buffer with a fixed internal capacity. Items can be put onto the buffer without blocking until the internal capacity is reached. Once the buffer is at capacity, subsequent calls to {#put} will block until an item is removed from the buffer, creating spare capacity.

Public Instance Methods

next() click to toggle source

@!macro channel_buffer_next

# File lib/concurrent/channel/buffer/buffered.rb, line 56
def next
  loop do
    synchronize do
      if ns_closed? && ns_empty?
        return Concurrent::NULL, false
      elsif !ns_empty?
        item = buffer.shift
        return item, true
      end
    end
    Thread.pass
  end
end
offer(item) click to toggle source

@!macro channel_buffer_offer

New items can be put onto the buffer until the number of items in the buffer reaches the {#size} value specified during initialization.

# File lib/concurrent/channel/buffer/buffered.rb, line 38
def offer(item)
  synchronize do
    if ns_closed? || ns_full?
      return false
    else
      ns_put_onto_buffer(item)
      return true
    end
  end
end
poll() click to toggle source

@!macro channel_buffer_poll

# File lib/concurrent/channel/buffer/buffered.rb, line 71
def poll
  synchronize do
    if ns_empty?
      Concurrent::NULL
    else
      buffer.shift
    end
  end
end
put(item) click to toggle source

@!macro channel_buffer_put

New items can be put onto the buffer until the number of items in the buffer reaches the {#size} value specified during initialization.

# File lib/concurrent/channel/buffer/buffered.rb, line 19
def put(item)
  loop do
    synchronize do
      if ns_closed?
        return false
      elsif !ns_full?
        ns_put_onto_buffer(item)
        return true
      end
    end
    Thread.pass
  end
end
take() click to toggle source

@!macro channel_buffer_take

# File lib/concurrent/channel/buffer/buffered.rb, line 50
def take
  item, _ = self.next
  item
end

Private Instance Methods

ns_empty?() click to toggle source

@!macro channel_buffer_empty_question

# File lib/concurrent/channel/buffer/buffered.rb, line 100
def ns_empty?
  ns_size == 0
end
ns_full?() click to toggle source

@!macro channel_buffer_full_question

# File lib/concurrent/channel/buffer/buffered.rb, line 105
def ns_full?
  ns_size == capacity
end
ns_initialize(size) click to toggle source

@!macro channel_buffer_initialize

@param [Integer] size the maximum capacity of the buffer; must be

greater than zero.

@raise [ArgumentError] when the size is zero (0) or less.

# File lib/concurrent/channel/buffer/buffered.rb, line 88
def ns_initialize(size)
  raise ArgumentError.new('size must be greater than 0') if size.to_i <= 0
  self.capacity = size.to_i
  self.buffer = []
end
ns_put_onto_buffer(item) click to toggle source

@!macro channel_buffer_put

# File lib/concurrent/channel/buffer/buffered.rb, line 110
def ns_put_onto_buffer(item)
  buffer.push(item)
end
ns_size() click to toggle source

@!macro channel_buffer_size_reader

# File lib/concurrent/channel/buffer/buffered.rb, line 95
def ns_size
  buffer.size
end