class Twirl::Cluster
Constants
- RetryableErrors
Private: The default Array of errors to retry.
Attributes
Private: What is the array index of the client being used currently.
Private: The number of commands issued to the current client.
Private: The number of commands to issue to a client before rotating.
Private: What handles dumping and loading values.
Private: What should be used to instrument all the things.
Private: The number of times to retry retryable errors.
Private: What errors should be considered retryable.
Public Class Methods
Public: Initialize a new cluster.
clients - An array of KJess::Client instances with port (localhost:1234) options - A Hash of options.
:commands_per_client - The Number of commands to run per client before rotating to the next client (default: 100) :retries - The Number of times a command should be retried (default: 5). :instrumenter - Where to send instrumention (defaults: noop). :encoder - What to use to dump/load vlues (defaults: mirror).
# File lib/twirl/cluster.rb, line 55 def initialize(clients, options = {}) @client_index = 0 @command_count = 0 @clients = clients.shuffle @retries = options.fetch(:retries, 5) @commands_per_client = options.fetch(:commands_per_client, 100) @instrumenter = options.fetch(:instrumenter, Instrumenters::Noop) @encoder = options.fetch(:encoder, Mirror) @retryable_errors = options.fetch(:retryable_errors, RetryableErrors) end
Public Instance Methods
Private: Returns the client to be used to issue a command.
# File lib/twirl/cluster.rb, line 219 def client rotate if @command_count >= @commands_per_client @command_count += 1 @clients[@client_index] end
Private: Perform an operation for a given client. Rotates clients if nil item is result of op.
Returns a Twirl::Item
if an item was found, otherwise nil.
# File lib/twirl/cluster.rb, line 249 def client_read_op(client, op, queue_name, *args) with_retries { |tries| @instrumenter.instrument("op.twirl") { |payload| payload[:op] = op payload[:queue_name] = queue_name payload[:retry] = tries != @retries if value = client.send(op, queue_name, *args) payload[:bytes] = value.size value = @encoder.load(value) if value Item.new queue_name, value, client, @instrumenter else rotate_for_next_op nil end } } end
Public : Remove a queue.
queue_name - The String name of the queue.
Returns a Hash of hosts and results.
# File lib/twirl/cluster.rb, line 143 def delete(queue_name) multi_client_queue_op_with_result :delete, queue_name end
Public: Disconnect from each client's server.
Returns nothing.
# File lib/twirl/cluster.rb, line 207 def disconnect multi_client_op :disconnect end
Public: Iterate through the clients.
# File lib/twirl/cluster.rb, line 67 def each(&block) @clients.each { |client| yield client } end
Public: Remove all items from a queue.
queue_name - The String name of the queue.
Returns a Hash of hosts and results.
# File lib/twirl/cluster.rb, line 152 def flush(queue_name) multi_client_queue_op_with_result :flush, queue_name end
Public: Remove all items from all queues.
Returns a Hash of hosts and results.
# File lib/twirl/cluster.rb, line 159 def flush_all multi_client_op_with_result :flush_all end
Public: Retrieve an item from the given queue.
It is possible to send both :open and :close in the same get operation, but I would not recommend it. You will end up in a situation where the client will rotate and the :close then goes to the wrong client.
We could do two get operations if you pass both options, send the :close to the current client and send the :open as a second operation to the rotated client, but that seems sneaky.
queue_name - The String name of the queue. options - The Hash of options for retrieving an item.
See KJess::Client#get for all options.
Returns a Twirl::Item
if an item was found, otherwise nil.
# File lib/twirl/cluster.rb, line 111 def get(queue_name, options = {}) client_read_op client, :get, queue_name, options end
Private: Perform an op on all the clients.
# File lib/twirl/cluster.rb, line 269 def multi_client_op(op, *args, &block) @instrumenter.instrument("op.twirl") { |payload| payload[:op] = op @clients.each do |client| if block_given? yield client else client.send(op, *args) end end } end
Private: Perform an op on all clients.
Returns a Hash of the servers as keys and the results as values.
# File lib/twirl/cluster.rb, line 303 def multi_client_op_with_result(op, *args, &block) @instrumenter.instrument("op.twirl") { |payload| payload[:op] = op result = {} @clients.each { |client| result["#{client.host}:#{client.port}"] = if block_given? yield client else client.send(op, *args) end } result } end
# File lib/twirl/cluster.rb, line 283 def multi_client_queue_op_with_result(op, queue_name, *args, &block) @instrumenter.instrument("op.twirl") { |payload| payload[:op] = op payload[:queue_name] = queue_name result = {} @clients.each { |client| result["#{client.host}:#{client.port}"] = if block_given? yield client else client.send(op, queue_name, *args) end } result } end
Public: Peek at the top item in the queue.
queue_name - The String name of the queue.
Returns a Twirl::Item
if an item was found, otherwise nil.
# File lib/twirl/cluster.rb, line 134 def peek(queue_name) client_read_op client, :peek, queue_name end
Public: Which clients can actually reach their server.
Returns Hash of hosts and results.
# File lib/twirl/cluster.rb, line 179 def ping multi_client_op_with_result :ping end
Public: Disconnect from each client's server.
Returns Hash of hosts and results.
# File lib/twirl/cluster.rb, line 200 def quit multi_client_op_with_result :quit end
Public: Reload the config of each client's server.
Returns Hash of hosts and results.
# File lib/twirl/cluster.rb, line 186 def reload multi_client_op_with_result :reload end
Public: Reserve the next item on the queue.
This is a helper method to get an item from a queue and open it for reliable read.
queue_name - The String name of the queue. options - Additional options.
See KJess::Client#get for all options.
Returns a Twirl::Item
if an item was found, otherwise nil.
# File lib/twirl/cluster.rb, line 125 def reserve(queue_name, options = {}) client_read_op client, :reserve, queue_name, options end
Private: Ensures that clients will be rotated by changing the client index and resetting the command count.
# File lib/twirl/cluster.rb, line 228 def rotate @instrumenter.instrument "op.twirl", { op: :rotate, metric_type: :counter, command_count: @command_count, commands_per_client: @commands_per_client, } @command_count = 0 @client_index = (@client_index + 1) % @clients.size end
Private: Makes it so the client will rotate for the next operation.
# File lib/twirl/cluster.rb, line 241 def rotate_for_next_op @command_count = @commands_per_client end
Public: Add an item to the given queue.
queue_name - The String name of the queue. item - The String item to add to the queue. expiration - The Number of seconds from now to expire the item (default: 0).
Returns true if successful, false otherwise.
# File lib/twirl/cluster.rb, line 78 def set(queue_name, item, expiration = 0) with_retries { |tries| @instrumenter.instrument("op.twirl") { |payload| payload[:op] = :set payload[:bytes] = item.to_s.size payload[:queue_name] = queue_name payload[:retry] = tries != @retries value = if item @encoder.dump(item) else nil end client.set(queue_name, value, expiration) } } end
Public: Tells each client to shutdown their server.
Returns nothing.
# File lib/twirl/cluster.rb, line 214 def shutdown multi_client_op :shutdown end
Public: Return stats for each client's server.
Returns a Hash of stats for each host.
# File lib/twirl/cluster.rb, line 193 def stats multi_client_op_with_result :stats end
Public: Return the version of each server.
Returns a Hash of hosts and results.
# File lib/twirl/cluster.rb, line 166 def version multi_client_op_with_result :version do |client| begin client.version rescue KJess::ProtocolError "unavailable" end end end
Private: Retries an operation a number of times if it raises exception.
# File lib/twirl/cluster.rb, line 320 def with_retries tries = @retries begin yield tries rescue *@retryable_errors tries -= 1 tries > 0 ? retry : raise end end