class Twirl::Cluster

Constants

RetryableErrors

Private: The default Array of errors to retry.

Attributes

client_index[R]

Private: What is the array index of the client being used currently.

command_count[R]

Private: The number of commands issued to the current client.

commands_per_client[R]

Private: The number of commands to issue to a client before rotating.

encoder[R]

Private: What handles dumping and loading values.

instrumenter[R]

Private: What should be used to instrument all the things.

retries[R]

Private: The number of times to retry retryable errors.

retryable_errors[R]

Private: What errors should be considered retryable.

Public Class Methods

new(clients, options = {}) click to toggle source

Public: Initialize a new cluster.

clients - An array of KJess::Client instances with port (localhost:1234) options - A Hash of options.

:commands_per_client - The Number of commands to run per client
                       before rotating to the next client (default: 100)
:retries - The Number of times a command should be retried (default: 5).
:instrumenter - Where to send instrumention (defaults: noop).
:encoder - What to use to dump/load vlues (defaults: mirror).
# File lib/twirl/cluster.rb, line 55
def initialize(clients, options = {})
  @client_index = 0
  @command_count = 0
  @clients = clients.shuffle
  @retries = options.fetch(:retries, 5)
  @commands_per_client = options.fetch(:commands_per_client, 100)
  @instrumenter = options.fetch(:instrumenter, Instrumenters::Noop)
  @encoder = options.fetch(:encoder, Mirror)
  @retryable_errors = options.fetch(:retryable_errors, RetryableErrors)
end

Public Instance Methods

client() click to toggle source

Private: Returns the client to be used to issue a command.

# File lib/twirl/cluster.rb, line 219
def client
  rotate if @command_count >= @commands_per_client

  @command_count += 1
  @clients[@client_index]
end
client_read_op(client, op, queue_name, *args) click to toggle source

Private: Perform an operation for a given client. Rotates clients if nil item is result of op.

Returns a Twirl::Item if an item was found, otherwise nil.

# File lib/twirl/cluster.rb, line 249
def client_read_op(client, op, queue_name, *args)
  with_retries { |tries|
    @instrumenter.instrument("op.twirl") { |payload|
      payload[:op] = op
      payload[:queue_name] = queue_name
      payload[:retry] = tries != @retries

      if value = client.send(op, queue_name, *args)
        payload[:bytes] = value.size
        value = @encoder.load(value) if value
        Item.new queue_name, value, client, @instrumenter
      else
        rotate_for_next_op
        nil
      end
    }
  }
end
delete(queue_name) click to toggle source

Public : Remove a queue.

queue_name - The String name of the queue.

Returns a Hash of hosts and results.

# File lib/twirl/cluster.rb, line 143
def delete(queue_name)
  multi_client_queue_op_with_result :delete, queue_name
end
disconnect() click to toggle source

Public: Disconnect from each client's server.

Returns nothing.

# File lib/twirl/cluster.rb, line 207
def disconnect
  multi_client_op :disconnect
end
each() { |client| ... } click to toggle source

Public: Iterate through the clients.

# File lib/twirl/cluster.rb, line 67
def each(&block)
  @clients.each { |client| yield client }
end
flush(queue_name) click to toggle source

Public: Remove all items from a queue.

queue_name - The String name of the queue.

Returns a Hash of hosts and results.

# File lib/twirl/cluster.rb, line 152
def flush(queue_name)
  multi_client_queue_op_with_result :flush, queue_name
end
flush_all() click to toggle source

Public: Remove all items from all queues.

Returns a Hash of hosts and results.

# File lib/twirl/cluster.rb, line 159
def flush_all
  multi_client_op_with_result :flush_all
end
get(queue_name, options = {}) click to toggle source

Public: Retrieve an item from the given queue.

It is possible to send both :open and :close in the same get operation, but I would not recommend it. You will end up in a situation where the client will rotate and the :close then goes to the wrong client.

We could do two get operations if you pass both options, send the :close to the current client and send the :open as a second operation to the rotated client, but that seems sneaky.

queue_name - The String name of the queue. options - The Hash of options for retrieving an item.

See KJess::Client#get for all options.

Returns a Twirl::Item if an item was found, otherwise nil.

# File lib/twirl/cluster.rb, line 111
def get(queue_name, options = {})
  client_read_op client, :get, queue_name, options
end
multi_client_op(op, *args) { |client| ... } click to toggle source

Private: Perform an op on all the clients.

# File lib/twirl/cluster.rb, line 269
def multi_client_op(op, *args, &block)
  @instrumenter.instrument("op.twirl") { |payload|
    payload[:op] = op

    @clients.each do |client|
      if block_given?
        yield client
      else
        client.send(op, *args)
      end
    end
  }
end
multi_client_op_with_result(op, *args) { |client| ... } click to toggle source

Private: Perform an op on all clients.

Returns a Hash of the servers as keys and the results as values.

# File lib/twirl/cluster.rb, line 303
def multi_client_op_with_result(op, *args, &block)
  @instrumenter.instrument("op.twirl") { |payload|
    payload[:op] = op

    result = {}
    @clients.each { |client|
      result["#{client.host}:#{client.port}"] = if block_given?
        yield client
      else
        client.send(op, *args)
      end
    }
    result
  }
end
multi_client_queue_op_with_result(op, queue_name, *args) { |client| ... } click to toggle source
# File lib/twirl/cluster.rb, line 283
def multi_client_queue_op_with_result(op, queue_name, *args, &block)
  @instrumenter.instrument("op.twirl") { |payload|
    payload[:op] = op
    payload[:queue_name] = queue_name

    result = {}
    @clients.each { |client|
      result["#{client.host}:#{client.port}"] = if block_given?
        yield client
      else
        client.send(op, queue_name, *args)
      end
    }
    result
  }
end
peek(queue_name) click to toggle source

Public: Peek at the top item in the queue.

queue_name - The String name of the queue.

Returns a Twirl::Item if an item was found, otherwise nil.

# File lib/twirl/cluster.rb, line 134
def peek(queue_name)
  client_read_op client, :peek, queue_name
end
ping() click to toggle source

Public: Which clients can actually reach their server.

Returns Hash of hosts and results.

# File lib/twirl/cluster.rb, line 179
def ping
  multi_client_op_with_result :ping
end
quit() click to toggle source

Public: Disconnect from each client's server.

Returns Hash of hosts and results.

# File lib/twirl/cluster.rb, line 200
def quit
  multi_client_op_with_result :quit
end
reload() click to toggle source

Public: Reload the config of each client's server.

Returns Hash of hosts and results.

# File lib/twirl/cluster.rb, line 186
def reload
  multi_client_op_with_result :reload
end
reserve(queue_name, options = {}) click to toggle source

Public: Reserve the next item on the queue.

This is a helper method to get an item from a queue and open it for reliable read.

queue_name - The String name of the queue. options - Additional options.

See KJess::Client#get for all options.

Returns a Twirl::Item if an item was found, otherwise nil.

# File lib/twirl/cluster.rb, line 125
def reserve(queue_name, options = {})
  client_read_op client, :reserve, queue_name, options
end
rotate() click to toggle source

Private: Ensures that clients will be rotated by changing the client index and resetting the command count.

# File lib/twirl/cluster.rb, line 228
def rotate
  @instrumenter.instrument "op.twirl", {
    op: :rotate,
    metric_type: :counter,
    command_count: @command_count,
    commands_per_client: @commands_per_client,
  }

  @command_count = 0
  @client_index = (@client_index + 1) % @clients.size
end
rotate_for_next_op() click to toggle source

Private: Makes it so the client will rotate for the next operation.

# File lib/twirl/cluster.rb, line 241
def rotate_for_next_op
  @command_count = @commands_per_client
end
set(queue_name, item, expiration = 0) click to toggle source

Public: Add an item to the given queue.

queue_name - The String name of the queue. item - The String item to add to the queue. expiration - The Number of seconds from now to expire the item (default: 0).

Returns true if successful, false otherwise.

# File lib/twirl/cluster.rb, line 78
def set(queue_name, item, expiration = 0)
  with_retries { |tries|
    @instrumenter.instrument("op.twirl") { |payload|
      payload[:op] = :set
      payload[:bytes] = item.to_s.size
      payload[:queue_name] = queue_name
      payload[:retry] = tries != @retries

      value = if item
        @encoder.dump(item)
      else
        nil
      end
      client.set(queue_name, value, expiration)
    }
  }
end
shutdown() click to toggle source

Public: Tells each client to shutdown their server.

Returns nothing.

# File lib/twirl/cluster.rb, line 214
def shutdown
  multi_client_op :shutdown
end
stats() click to toggle source

Public: Return stats for each client's server.

Returns a Hash of stats for each host.

# File lib/twirl/cluster.rb, line 193
def stats
  multi_client_op_with_result :stats
end
version() click to toggle source

Public: Return the version of each server.

Returns a Hash of hosts and results.

# File lib/twirl/cluster.rb, line 166
def version
  multi_client_op_with_result :version do |client|
    begin
      client.version
    rescue KJess::ProtocolError
      "unavailable"
    end
  end
end
with_retries() { |tries| ... } click to toggle source

Private: Retries an operation a number of times if it raises exception.

# File lib/twirl/cluster.rb, line 320
def with_retries
  tries = @retries
  begin
    yield tries
  rescue *@retryable_errors
    tries -= 1
    tries > 0 ? retry : raise
  end
end