class ThrottleQueue

ThrottleQueue is a thread-safe rate-limited work queue. It allows both background and foreground operations.

Example:

q = ThrottleQueue 3
files.each {|file|
  q.background(file) {|id|
    fetch file
  }
}

Public Class Methods

new(limit) click to toggle source

Creates a new ThrottleQueue with the given rate limit (per second).

# File lib/throttle-queue/single-process.rb, line 14
def initialize(limit)
        raise "refusing to do zero work per second" if limit <= 0
        @limit = limit

        @queue = PriorityQueue.new

        @mutex = Mutex.new
        @pausing = ConditionVariable.new
        @idle = ConditionVariable.new
        @in_flight = nil
        @processing_thread = nil
        @items = {}

        @throttling = nil
        @state = :idle
        @t0 = Time.now
end

Public Instance Methods

background(id, &block) click to toggle source

Adds work to the queue to run in the background, and returns immediately.

If the block takes an argument, it will be passed the same id used to queue the work.

# File lib/throttle-queue/single-process.rb, line 58
def background(id, &block)
        @mutex.synchronize {
                unless @items.has_key? id
                        @items[id] = block
                        @queue.background id
                        run
                end
        }
end
foreground(id, &block) click to toggle source

Adds work to the queue ahead of all background work, and blocks until the given block has been called.

Will preempt an id of the same value in the background queue, and wait on an id of the same value already in the foreground queue.

If the block takes an argument, it will be passed the same id used to queue the work.

# File lib/throttle-queue/single-process.rb, line 76
def foreground(id, &block)
        t = nil
        @mutex.synchronize {
                if id == @in_flight
                        t = @processing_thread unless @processing_thread == Thread.current
                else
                        t = @items[id]
                        unless t.is_a? FG
                                t = @items[id] = FG.new block, self
                                @queue.foreground id
                                run
                        end
                end
        }
        t.join if t
end
idle?() click to toggle source

Returns true if there is nothing queued and no threads are running

# File lib/throttle-queue/single-process.rb, line 41
def idle?
        @state == :idle
end
shutdown() click to toggle source

Signals the queue to stop processing and shutdown.

Items still in the queue are dropped. Any item currently in flight will finish.

# File lib/throttle-queue/single-process.rb, line 35
def shutdown
        @queue.shutdown
        @pausing.signal
end
wait(timeout = nil) click to toggle source

Blocks the calling thread while the queue processes work.

Returns after the timeout has expired, or after the queue returns to the idle state.

# File lib/throttle-queue/single-process.rb, line 48
def wait(timeout = nil)
        @mutex.synchronize {
                @idle.wait(@mutex, timeout) unless idle?
        }
end

Private Instance Methods

run() click to toggle source
# File lib/throttle-queue/single-process.rb, line 94
def run
        return unless @state == :idle
        @state = :running
        @throttling = Thread.new {
                loop {
                        break if @queue.shutdown? or @queue.empty?

                        elapsed = Time.now - @t0
                        wait_time = 1.0 / @limit + 0.01
                        if @processing_thread and elapsed < wait_time
                                @mutex.synchronize {
                                        @pausing.wait @mutex, wait_time - elapsed
                                }
                        end

                        if id = @queue.pop
                                @mutex.synchronize {
                                        @in_flight = id
                                        @processing_thread = Thread.new {
                                                block = @items[@in_flight]
                                                if block.arity == 0
                                                        block.call
                                                else
                                                        block.call @in_flight
                                                end
                                        }
                                }
                                @processing_thread.join if @processing_thread

                                @mutex.synchronize {
                                        @items.delete @in_flight
                                        @in_flight = nil
                                }
                        end

                        @t0 = Time.now
                }

                @mutex.synchronize {
                        @state = :idle
                        if @queue.shutdown? or @queue.empty?
                                @idle.broadcast
                        else
                                # Restart to prevent a join deadlock
                                send :run
                        end
                }
        }
end