class Concurrent::Channel::Buffer::Unbuffered

A blocking buffer with a size of zero. An item can only be put onto the buffer when a thread is waiting to take. Similarly, an item can only be put onto the buffer when a thread is waiting to put. When either {#put} or {#take} is called and there is no corresponding call in progress, the call will block indefinitely. Any other calls to the same method will queue behind the first call and block as well. As soon as a corresponding put/take call is made an exchange will occur and the first blocked call will return.

Public Instance Methods

empty?() click to toggle source

@!macro channel_buffer_empty_question

# File lib/concurrent/channel/buffer/unbuffered.rb, line 27
def empty?
  size == 0
end
full?() click to toggle source

@!macro channel_buffer_full_question

# File lib/concurrent/channel/buffer/unbuffered.rb, line 32
def full?
  !empty?
end
next() click to toggle source

@!macro channel_buffer_next

Items can only be taken from the buffer when one or more threads are waiting to {#put} items onto the buffer. This method exhibits the same blocking behavior as {#take}.

@see {#take}

# File lib/concurrent/channel/buffer/unbuffered.rb, line 135
def next
  item = take
  more = (item != Concurrent::NULL)
  return item, more
end
offer(item) click to toggle source

@!macro channel_buffer_offer

Items can only be put onto the buffer when one or more threads are waiting to {#take} items off the buffer. When there is a thread waiting to take an item this method will give its item and return `true` immediately. When there are no threads waiting to take or the buffer is closed, this method will return `false` immediately.

# File lib/concurrent/channel/buffer/unbuffered.rb, line 71
def offer(item)
  synchronize do
    return false if ns_closed? || taking.empty?

    taken = taking.shift
    taken.value = item
    true
  end
end
poll() click to toggle source

@!macro channel_buffer_poll

Items can only be taken off the buffer when one or more threads are waiting to {#put} items onto the buffer. When there is a thread waiting to put an item this method will take the item and return it immediately. When there are no threads waiting to put or the buffer is closed, this method will return `Concurrent::NULL` immediately.

# File lib/concurrent/channel/buffer/unbuffered.rb, line 117
def poll
  synchronize do
    return Concurrent::NULL if putting.empty?

    put = putting.shift
    value = put.value
    put.value = nil
    value
  end
end
put(item) click to toggle source

@!macro channel_buffer_put

Items can only be put onto the buffer when one or more threads are waiting to {#take} items off the buffer. When there is a thread waiting to take an item this method will give its item and return immediately. When there are no threads waiting to take, this method will block. As soon as a thread calls `take` the exchange will occur and this method will return.

# File lib/concurrent/channel/buffer/unbuffered.rb, line 44
def put(item)
  mine = synchronize do
    return false if ns_closed?

    ref = Concurrent::AtomicReference.new(item)
    if taking.empty?
      putting.push(ref)
    else
      taken = taking.shift
      taken.value = item
      ref.value = nil
    end
    ref
  end
  loop do
    return true if mine.value.nil?
    Thread.pass
  end
end
size() click to toggle source

@!macro channel_buffer_size_reader

# File lib/concurrent/channel/buffer/unbuffered.rb, line 20
def size
  synchronize do
    putting.empty? ? 0 : 1
  end
end
take() click to toggle source

@!macro channel_buffer_take

Items can only be taken from the buffer when one or more threads are waiting to {#put} items onto the buffer. When there is a thread waiting to put an item this method will take that item and return it immediately. When there are no threads waiting to put, this method will block. As soon as a thread calls `pur` the exchange will occur and this method will return.

# File lib/concurrent/channel/buffer/unbuffered.rb, line 89
def take
  mine = synchronize do
    return Concurrent::NULL if ns_closed? && putting.empty?

    ref = Concurrent::AtomicReference.new(nil)
    if putting.empty?
      taking.push(ref)
    else
      put = putting.shift
      ref.value = put.value
      put.value = nil
    end
    ref
  end
  loop do
    item = mine.value
    return item if item
    Thread.pass
  end
end

Private Instance Methods

ns_initialize() click to toggle source

@!macro channel_buffer_initialize

# File lib/concurrent/channel/buffer/unbuffered.rb, line 148
def ns_initialize
  # one will always be empty
  @putting = []
  @taking = []
  self.closed = false
  self.capacity = 1
end
putting() click to toggle source
# File lib/concurrent/channel/buffer/unbuffered.rb, line 143
def putting() @putting; end
taking() click to toggle source
# File lib/concurrent/channel/buffer/unbuffered.rb, line 145
def taking() @taking; end