class Concurrent::Channel::Buffer::Unbuffered
A blocking buffer with a size of zero. An item can only be put onto the buffer when a thread is waiting to take. Similarly, an item can only be put onto the buffer when a thread is waiting to put. When either {#put} or {#take} is called and there is no corresponding call in progress, the call will block indefinitely. Any other calls to the same method will queue behind the first call and block as well. As soon as a corresponding put/take call is made an exchange will occur and the first blocked call will return.
Public Instance Methods
@!macro channel_buffer_empty_question
# File lib/concurrent/channel/buffer/unbuffered.rb, line 27 def empty? size == 0 end
@!macro channel_buffer_full_question
# File lib/concurrent/channel/buffer/unbuffered.rb, line 32 def full? !empty? end
@!macro channel_buffer_next
Items can only be taken from the buffer when one or more threads are waiting to {#put} items onto the buffer. This method exhibits the same blocking behavior as {#take}.
@see {#take}
# File lib/concurrent/channel/buffer/unbuffered.rb, line 135 def next item = take more = (item != Concurrent::NULL) return item, more end
@!macro channel_buffer_offer
Items can only be put onto the buffer when one or more threads are waiting to {#take} items off the buffer. When there is a thread waiting to take an item this method will give its item and return `true` immediately. When there are no threads waiting to take or the buffer is closed, this method will return `false` immediately.
# File lib/concurrent/channel/buffer/unbuffered.rb, line 71 def offer(item) synchronize do return false if ns_closed? || taking.empty? taken = taking.shift taken.value = item true end end
@!macro channel_buffer_poll
Items can only be taken off the buffer when one or more threads are waiting to {#put} items onto the buffer. When there is a thread waiting to put an item this method will take the item and return it immediately. When there are no threads waiting to put or the buffer is closed, this method will return `Concurrent::NULL` immediately.
# File lib/concurrent/channel/buffer/unbuffered.rb, line 117 def poll synchronize do return Concurrent::NULL if putting.empty? put = putting.shift value = put.value put.value = nil value end end
@!macro channel_buffer_put
Items can only be put onto the buffer when one or more threads are waiting to {#take} items off the buffer. When there is a thread waiting to take an item this method will give its item and return immediately. When there are no threads waiting to take, this method will block. As soon as a thread calls `take` the exchange will occur and this method will return.
# File lib/concurrent/channel/buffer/unbuffered.rb, line 44 def put(item) mine = synchronize do return false if ns_closed? ref = Concurrent::AtomicReference.new(item) if taking.empty? putting.push(ref) else taken = taking.shift taken.value = item ref.value = nil end ref end loop do return true if mine.value.nil? Thread.pass end end
@!macro channel_buffer_size_reader
# File lib/concurrent/channel/buffer/unbuffered.rb, line 20 def size synchronize do putting.empty? ? 0 : 1 end end
@!macro channel_buffer_take
Items can only be taken from the buffer when one or more threads are waiting to {#put} items onto the buffer. When there is a thread waiting to put an item this method will take that item and return it immediately. When there are no threads waiting to put, this method will block. As soon as a thread calls `pur` the exchange will occur and this method will return.
# File lib/concurrent/channel/buffer/unbuffered.rb, line 89 def take mine = synchronize do return Concurrent::NULL if ns_closed? && putting.empty? ref = Concurrent::AtomicReference.new(nil) if putting.empty? taking.push(ref) else put = putting.shift ref.value = put.value put.value = nil end ref end loop do item = mine.value return item if item Thread.pass end end
Private Instance Methods
@!macro channel_buffer_initialize
# File lib/concurrent/channel/buffer/unbuffered.rb, line 148 def ns_initialize # one will always be empty @putting = [] @taking = [] self.closed = false self.capacity = 1 end
# File lib/concurrent/channel/buffer/unbuffered.rb, line 143 def putting() @putting; end
# File lib/concurrent/channel/buffer/unbuffered.rb, line 145 def taking() @taking; end