class Concurrent::Channel

{include:file:doc/channel.md}

Constants

BUFFER_TYPES
DEFAULT_VALIDATOR
Error
GOROUTINES

NOTE: Move to global IO pool once stable

Public Class Methods

after(seconds)
Alias for: timer
alt(*args)
Alias for: select
go(*args, &block) click to toggle source
# File lib/concurrent/channel.rb, line 223
def go(*args, &block)
  go_via(GOROUTINES, *args, &block)
end
go_loop(*args, &block) click to toggle source
# File lib/concurrent/channel.rb, line 232
def go_loop(*args, &block)
  go_loop_via(GOROUTINES, *args, &block)
end
go_loop_via(executor, *args, &block) click to toggle source
# File lib/concurrent/channel.rb, line 236
def go_loop_via(executor, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  executor.post(block, *args) do
    loop do
      break unless block.call(*args)
    end
  end
end
go_via(executor, *args, &block) click to toggle source
# File lib/concurrent/channel.rb, line 227
def go_via(executor, *args, &block)
  raise ArgumentError.new('no block given') unless block_given?
  executor.post(*args, &block)
end
new(opts = {}) click to toggle source
# File lib/concurrent/channel.rb, line 46
def initialize(opts = {})
  # undocumented -- for internal use only
  if opts.is_a? Buffer::Base
    self.buffer = opts
    return
  end

  capacity = opts[:capacity] || opts[:size]
  buffer = opts[:buffer]

  if capacity && buffer == :unbuffered
    raise ArgumentError.new('unbuffered channels cannot have a capacity')
  elsif capacity.nil? && buffer.nil?
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif capacity == 0 && buffer == :buffered
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif buffer == :unbuffered
    self.buffer = BUFFER_TYPES[:unbuffered].new
  elsif capacity.nil? || capacity < 1
    raise ArgumentError.new('capacity must be at least 1 for this buffer type')
  else
    buffer ||= :buffered
    self.buffer = BUFFER_TYPES[buffer].new(capacity)
  end

  self.validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
end
select(*args) { |selector, *args| ... } click to toggle source
# File lib/concurrent/channel.rb, line 215
def select(*args)
  raise ArgumentError.new('no block given') unless block_given?
  selector = Selector.new
  yield(selector, *args)
  selector.execute
end
Also aliased as: alt
tick(interval)
Alias for: ticker
ticker(interval) click to toggle source
# File lib/concurrent/channel.rb, line 210
def ticker(interval)
  Channel.new(Buffer::Ticker.new(interval))
end
Also aliased as: tick
timer(seconds) click to toggle source
# File lib/concurrent/channel.rb, line 205
def timer(seconds)
  Channel.new(Buffer::Timer.new(seconds))
end
Also aliased as: after

Public Instance Methods

<<(item)
Alias for: put
each() { |item| ... } click to toggle source
# File lib/concurrent/channel.rb, line 192
def each
  raise ArgumentError.new('no block given') unless block_given?
  loop do
    item, more = do_next
    if item != Concurrent::NULL
      yield(item)
    elsif !more
      break
    end
  end
end
next() click to toggle source

@example

jobs = Channel.new

Channel.go do
  loop do
    j, more = jobs.next
    if more
      print "received job #{j}\n"
    else
      print "received all jobs\n"
      break
    end
  end
end
# File lib/concurrent/channel.rb, line 158
def next
  item, more = do_next
  item = nil if item == Concurrent::NULL
  return item, more
end
next?() click to toggle source
# File lib/concurrent/channel.rb, line 164
def next?
  item, more = do_next
  item = if item == Concurrent::NULL
           Concurrent::Maybe.nothing
         else
           Concurrent::Maybe.just(item)
         end
  return item, more
end
offer(item) click to toggle source
# File lib/concurrent/channel.rb, line 98
def offer(item)
  return false unless validate(item, false, false)
  do_offer(item)
end
offer!(item) click to toggle source
# File lib/concurrent/channel.rb, line 103
def offer!(item)
  validate(item, false, true)
  ok = do_offer(item)
  raise Error if !ok
  ok
end
offer?(item) click to toggle source
# File lib/concurrent/channel.rb, line 110
def offer?(item)
  if !validate(item, true, false)
    Concurrent::Maybe.nothing('invalid value')
  elsif do_offer(item)
    Concurrent::Maybe.just(true)
  else
    Concurrent::Maybe.nothing
  end
end
poll() click to toggle source
# File lib/concurrent/channel.rb, line 174
def poll
  (item = do_poll) == Concurrent::NULL ? nil : item
end
poll!() click to toggle source
# File lib/concurrent/channel.rb, line 178
def poll!
  item = do_poll
  raise Error if item == Concurrent::NULL
  item
end
poll?() click to toggle source
# File lib/concurrent/channel.rb, line 184
def poll?
  if (item = do_poll) == Concurrent::NULL
    Concurrent::Maybe.nothing
  else
    Concurrent::Maybe.just(item)
  end
end
put(item) click to toggle source
# File lib/concurrent/channel.rb, line 74
def put(item)
  return false unless validate(item, false, false)
  do_put(item)
end
Also aliased as: send, <<
put!(item) click to toggle source
# File lib/concurrent/channel.rb, line 81
def put!(item)
  validate(item, false, true)
  ok = do_put(item)
  raise Error if !ok
  ok
end
put?(item) click to toggle source
# File lib/concurrent/channel.rb, line 88
def put?(item)
  if !validate(item, true, false)
    Concurrent::Maybe.nothing('invalid value')
  elsif do_put(item)
    Concurrent::Maybe.just(true)
  else
    Concurrent::Maybe.nothing
  end
end
receive()
Alias for: take
send(item)
Alias for: put
take() click to toggle source
# File lib/concurrent/channel.rb, line 120
def take
  item = do_take
  item == Concurrent::NULL ? nil : item
end
Also aliased as: receive, ~
take!() click to toggle source
# File lib/concurrent/channel.rb, line 127
def take!
  item = do_take
  raise Error if item == Concurrent::NULL
  item
end
take?() click to toggle source
# File lib/concurrent/channel.rb, line 133
def take?
  item = do_take
  item = if item == Concurrent::NULL
           Concurrent::Maybe.nothing
         else
           Concurrent::Maybe.just(item)
         end
  item
end
~()
Alias for: take

Private Instance Methods

buffer() click to toggle source
# File lib/concurrent/channel.rb, line 256
def buffer
  @buffer
end
buffer=(value) click to toggle source
# File lib/concurrent/channel.rb, line 260
def buffer=(value)
  @buffer = value
end
do_next() click to toggle source
# File lib/concurrent/channel.rb, line 289
def do_next
  buffer.next
end
do_offer(item) click to toggle source
# File lib/concurrent/channel.rb, line 281
def do_offer(item)
  buffer.offer(item)
end
do_poll() click to toggle source
# File lib/concurrent/channel.rb, line 293
def do_poll
  buffer.poll
end
do_put(item) click to toggle source
# File lib/concurrent/channel.rb, line 277
def do_put(item)
  buffer.put(item)
end
do_take() click to toggle source
# File lib/concurrent/channel.rb, line 285
def do_take
  buffer.take
end
validate(value, allow_nil, raise_error) click to toggle source
# File lib/concurrent/channel.rb, line 264
def validate(value, allow_nil, raise_error)
  if !allow_nil && value.nil?
    raise_error ? raise(ValidationError.new('nil is not a valid value')) : false
  elsif !validator.call(value)
    raise_error ? raise(ValidationError) : false
  else
    true
  end
rescue => ex
  # the validator raised an exception
  return raise_error ? raise(ex) : false
end
validator() click to toggle source
# File lib/concurrent/channel.rb, line 248
def validator
  @validator
end
validator=(value) click to toggle source
# File lib/concurrent/channel.rb, line 252
def validator=(value)
  @validator = value
end