class Concurrent::Channel
{include:file:doc/channel.md}
Constants
- BUFFER_TYPES
- DEFAULT_VALIDATOR
- Error
- GOROUTINES
NOTE: Move to global IO pool once stable
Public Class Methods
go(*args, &block)
click to toggle source
# File lib/concurrent/channel.rb, line 223 def go(*args, &block) go_via(GOROUTINES, *args, &block) end
go_loop(*args, &block)
click to toggle source
# File lib/concurrent/channel.rb, line 232 def go_loop(*args, &block) go_loop_via(GOROUTINES, *args, &block) end
go_loop_via(executor, *args, &block)
click to toggle source
# File lib/concurrent/channel.rb, line 236 def go_loop_via(executor, *args, &block) raise ArgumentError.new('no block given') unless block_given? executor.post(block, *args) do loop do break unless block.call(*args) end end end
go_via(executor, *args, &block)
click to toggle source
# File lib/concurrent/channel.rb, line 227 def go_via(executor, *args, &block) raise ArgumentError.new('no block given') unless block_given? executor.post(*args, &block) end
new(opts = {})
click to toggle source
# File lib/concurrent/channel.rb, line 46 def initialize(opts = {}) # undocumented -- for internal use only if opts.is_a? Buffer::Base self.buffer = opts return end capacity = opts[:capacity] || opts[:size] buffer = opts[:buffer] if capacity && buffer == :unbuffered raise ArgumentError.new('unbuffered channels cannot have a capacity') elsif capacity.nil? && buffer.nil? self.buffer = BUFFER_TYPES[:unbuffered].new elsif capacity == 0 && buffer == :buffered self.buffer = BUFFER_TYPES[:unbuffered].new elsif buffer == :unbuffered self.buffer = BUFFER_TYPES[:unbuffered].new elsif capacity.nil? || capacity < 1 raise ArgumentError.new('capacity must be at least 1 for this buffer type') else buffer ||= :buffered self.buffer = BUFFER_TYPES[buffer].new(capacity) end self.validator = opts.fetch(:validator, DEFAULT_VALIDATOR) end
select(*args) { |selector, *args| ... }
click to toggle source
# File lib/concurrent/channel.rb, line 215 def select(*args) raise ArgumentError.new('no block given') unless block_given? selector = Selector.new yield(selector, *args) selector.execute end
Also aliased as: alt
ticker(interval)
click to toggle source
# File lib/concurrent/channel.rb, line 210 def ticker(interval) Channel.new(Buffer::Ticker.new(interval)) end
Also aliased as: tick
timer(seconds)
click to toggle source
# File lib/concurrent/channel.rb, line 205 def timer(seconds) Channel.new(Buffer::Timer.new(seconds)) end
Also aliased as: after
Public Instance Methods
each() { |item| ... }
click to toggle source
# File lib/concurrent/channel.rb, line 192 def each raise ArgumentError.new('no block given') unless block_given? loop do item, more = do_next if item != Concurrent::NULL yield(item) elsif !more break end end end
next()
click to toggle source
@example
jobs = Channel.new Channel.go do loop do j, more = jobs.next if more print "received job #{j}\n" else print "received all jobs\n" break end end end
# File lib/concurrent/channel.rb, line 158 def next item, more = do_next item = nil if item == Concurrent::NULL return item, more end
next?()
click to toggle source
# File lib/concurrent/channel.rb, line 164 def next? item, more = do_next item = if item == Concurrent::NULL Concurrent::Maybe.nothing else Concurrent::Maybe.just(item) end return item, more end
offer(item)
click to toggle source
# File lib/concurrent/channel.rb, line 98 def offer(item) return false unless validate(item, false, false) do_offer(item) end
offer!(item)
click to toggle source
# File lib/concurrent/channel.rb, line 103 def offer!(item) validate(item, false, true) ok = do_offer(item) raise Error if !ok ok end
offer?(item)
click to toggle source
# File lib/concurrent/channel.rb, line 110 def offer?(item) if !validate(item, true, false) Concurrent::Maybe.nothing('invalid value') elsif do_offer(item) Concurrent::Maybe.just(true) else Concurrent::Maybe.nothing end end
poll()
click to toggle source
# File lib/concurrent/channel.rb, line 174 def poll (item = do_poll) == Concurrent::NULL ? nil : item end
poll!()
click to toggle source
# File lib/concurrent/channel.rb, line 178 def poll! item = do_poll raise Error if item == Concurrent::NULL item end
poll?()
click to toggle source
# File lib/concurrent/channel.rb, line 184 def poll? if (item = do_poll) == Concurrent::NULL Concurrent::Maybe.nothing else Concurrent::Maybe.just(item) end end
put(item)
click to toggle source
# File lib/concurrent/channel.rb, line 74 def put(item) return false unless validate(item, false, false) do_put(item) end
put!(item)
click to toggle source
# File lib/concurrent/channel.rb, line 81 def put!(item) validate(item, false, true) ok = do_put(item) raise Error if !ok ok end
put?(item)
click to toggle source
# File lib/concurrent/channel.rb, line 88 def put?(item) if !validate(item, true, false) Concurrent::Maybe.nothing('invalid value') elsif do_put(item) Concurrent::Maybe.just(true) else Concurrent::Maybe.nothing end end
take()
click to toggle source
# File lib/concurrent/channel.rb, line 120 def take item = do_take item == Concurrent::NULL ? nil : item end
take!()
click to toggle source
# File lib/concurrent/channel.rb, line 127 def take! item = do_take raise Error if item == Concurrent::NULL item end
take?()
click to toggle source
# File lib/concurrent/channel.rb, line 133 def take? item = do_take item = if item == Concurrent::NULL Concurrent::Maybe.nothing else Concurrent::Maybe.just(item) end item end
Private Instance Methods
buffer()
click to toggle source
# File lib/concurrent/channel.rb, line 256 def buffer @buffer end
buffer=(value)
click to toggle source
# File lib/concurrent/channel.rb, line 260 def buffer=(value) @buffer = value end
do_next()
click to toggle source
# File lib/concurrent/channel.rb, line 289 def do_next buffer.next end
do_offer(item)
click to toggle source
# File lib/concurrent/channel.rb, line 281 def do_offer(item) buffer.offer(item) end
do_poll()
click to toggle source
# File lib/concurrent/channel.rb, line 293 def do_poll buffer.poll end
do_put(item)
click to toggle source
# File lib/concurrent/channel.rb, line 277 def do_put(item) buffer.put(item) end
do_take()
click to toggle source
# File lib/concurrent/channel.rb, line 285 def do_take buffer.take end
validate(value, allow_nil, raise_error)
click to toggle source
# File lib/concurrent/channel.rb, line 264 def validate(value, allow_nil, raise_error) if !allow_nil && value.nil? raise_error ? raise(ValidationError.new('nil is not a valid value')) : false elsif !validator.call(value) raise_error ? raise(ValidationError) : false else true end rescue => ex # the validator raised an exception return raise_error ? raise(ex) : false end
validator()
click to toggle source
# File lib/concurrent/channel.rb, line 248 def validator @validator end
validator=(value)
click to toggle source
# File lib/concurrent/channel.rb, line 252 def validator=(value) @validator = value end