class Octopus::Proxy

Attributes

proxy_config[RW]

Public Class Methods

new(config = Octopus.config) click to toggle source
# File lib/octopus/proxy.rb, line 19
def initialize(config = Octopus.config)
  self.proxy_config = Octopus::ProxyConfig.new(config)
end

Public Instance Methods

check_schema_migrations(shard) click to toggle source
# File lib/octopus/proxy.rb, line 114
def check_schema_migrations(shard)
  OctopusModel.using(shard).connection.table_exists?(
    ActiveRecord::Migrator.schema_migrations_table_name,
  ) || OctopusModel.using(shard).connection.initialize_schema_migrations_table
end
clean_connection_proxy() click to toggle source
# File lib/octopus/proxy.rb, line 107
def clean_connection_proxy
  self.current_shard = Octopus.master_shard
  self.current_model = nil
  self.current_group = nil
  self.block = nil
end
clear_active_connections!() click to toggle source
# File lib/octopus/proxy.rb, line 157
def clear_active_connections!
  with_each_healthy_shard(&:release_connection)
end
clear_all_connections!() click to toggle source
# File lib/octopus/proxy.rb, line 161
def clear_all_connections!
  with_each_healthy_shard(&:disconnect!)

  if Octopus.atleast_rails52?
    # On Rails 5.2 it is no longer safe to re-use connection pools after they have been discarded
    # This happens on webservers with forking, for example Phusion Passenger.
    # Therefor after we clear all connections we reinitialize the shards to get fresh and not discarded ConnectionPool objects
    proxy_config.reinitialize_shards
  end
end
clear_query_cache() click to toggle source
# File lib/octopus/proxy.rb, line 153
def clear_query_cache
  with_each_healthy_shard { |v| v.connected? && safe_connection(v).clear_query_cache }
end
connected?() click to toggle source
# File lib/octopus/proxy.rb, line 172
def connected?
  shards.any? { |_k, v| v.connected? }
end
connection_pool() click to toggle source
# File lib/octopus/proxy.rb, line 138
def connection_pool
  shards[current_shard]
end
current_model_replicated?() click to toggle source
# File lib/octopus/proxy.rb, line 192
def current_model_replicated?
  replicated && (current_model.try(:replicated) || fully_replicated?)
end
delete(*args, &block) click to toggle source
# File lib/octopus/proxy.rb, line 55
def delete(*args, &block)
  legacy_method_missing_logic('delete', *args, &block)
end
disable_query_cache!() click to toggle source
# File lib/octopus/proxy.rb, line 148
def disable_query_cache!
  with_each_healthy_shard { |v| v.connected? && safe_connection(v).disable_query_cache! }
end
enable_query_cache!() click to toggle source
# File lib/octopus/proxy.rb, line 143
def enable_query_cache!
  clear_query_cache
  with_each_healthy_shard { |v| v.connected? && safe_connection(v).enable_query_cache! }
end
execute(sql, name = nil) click to toggle source
# File lib/octopus/proxy.rb, line 36
def execute(sql, name = nil)
  conn = select_connection
  clean_connection_proxy if should_clean_connection_proxy?('execute')
  conn.execute(sql, name)
end
initialize_metadata_table() click to toggle source
# File lib/octopus/proxy.rb, line 204
def initialize_metadata_table
  select_connection.transaction { ActiveRecord::InternalMetadata.create_table }
end
initialize_schema_migrations_table() click to toggle source
# File lib/octopus/proxy.rb, line 196
def initialize_schema_migrations_table
  if Octopus.atleast_rails52?
    select_connection.transaction { ActiveRecord::SchemaMigration.create_table }
  else 
    select_connection.initialize_schema_migrations_table
  end
end
insert(arel, name = nil, pk = nil, id_value = nil, sequence_name = nil, binds = []) click to toggle source
# File lib/octopus/proxy.rb, line 42
def insert(arel, name = nil, pk = nil, id_value = nil, sequence_name = nil, binds = [])
  conn = select_connection
  clean_connection_proxy if should_clean_connection_proxy?('insert')
  conn.insert(arel, name, pk, id_value, sequence_name, binds)
end
method_missing(method, *args, &block) click to toggle source
# File lib/octopus/proxy.rb, line 130
def method_missing(method, *args, &block)
  legacy_method_missing_logic(method, *args, &block)
end
respond_to?(method, include_private = false) click to toggle source
Calls superclass method
# File lib/octopus/proxy.rb, line 134
def respond_to?(method, include_private = false)
  super || select_connection.respond_to?(method, include_private)
end
run_queries_on_shard(shard) { || ... } click to toggle source
# File lib/octopus/proxy.rb, line 83
def run_queries_on_shard(shard, &_block)
  keeping_connection_proxy(shard) do
    using_shard(shard) do
      yield
    end
  end
end
safe_connection(connection_pool) click to toggle source

Rails 3.1 sets automatic_reconnect to false when it removes connection pool. Octopus can potentially retain a reference to a closed connection pool. Previously, that would work since the pool would just reconnect, but in Rails 3.1 the flag prevents this.

# File lib/octopus/proxy.rb, line 71
def safe_connection(connection_pool)
  connection_pool.automatic_reconnect ||= true
  if !connection_pool.connected? && shards[Octopus.master_shard].connection.query_cache_enabled
    connection_pool.connection.enable_query_cache!
  end
  connection_pool.connection
end
select_all(*args, &block) click to toggle source
# File lib/octopus/proxy.rb, line 59
def select_all(*args, &block)
  legacy_method_missing_logic('select_all', *args, &block)
end
select_connection() click to toggle source
# File lib/octopus/proxy.rb, line 79
def select_connection
  safe_connection(shards[shard_name])
end
select_value(*args, &block) click to toggle source
# File lib/octopus/proxy.rb, line 63
def select_value(*args, &block)
  legacy_method_missing_logic('select_value', *args, &block)
end
send_queries_to_all_shards(&block) click to toggle source
# File lib/octopus/proxy.rb, line 103
def send_queries_to_all_shards(&block)
  send_queries_to_multiple_shards(shard_names.uniq { |shard_name| shards[shard_name] }, &block)
end
send_queries_to_group(group, &block) click to toggle source
# File lib/octopus/proxy.rb, line 97
def send_queries_to_group(group, &block)
  using_group(group) do
    send_queries_to_multiple_shards(shards_for_group(group), &block)
  end
end
send_queries_to_multiple_shards(shards, &block) click to toggle source
# File lib/octopus/proxy.rb, line 91
def send_queries_to_multiple_shards(shards, &block)
  shards.map do |shard|
    run_queries_on_shard(shard, &block)
  end
end
send_queries_to_shard_slave_group(method, *args, &block) click to toggle source
# File lib/octopus/proxy.rb, line 180
def send_queries_to_shard_slave_group(method, *args, &block)
  send_queries_to_balancer(shards_slave_groups[current_shard][current_slave_group], method, *args, &block)
end
send_queries_to_slave_group(method, *args, &block) click to toggle source
# File lib/octopus/proxy.rb, line 188
def send_queries_to_slave_group(method, *args, &block)
  send_queries_to_balancer(slave_groups[current_slave_group], method, *args, &block)
end
should_send_queries_to_shard_slave_group?(method) click to toggle source
# File lib/octopus/proxy.rb, line 176
def should_send_queries_to_shard_slave_group?(method)
  should_use_slaves_for_method?(method) && shards_slave_groups.try(:[], current_shard).try(:[], current_slave_group).present?
end
should_send_queries_to_slave_group?(method) click to toggle source
# File lib/octopus/proxy.rb, line 184
def should_send_queries_to_slave_group?(method)
  should_use_slaves_for_method?(method) && slave_groups.try(:[], current_slave_group).present?
end
transaction(options = {}, &block) click to toggle source
# File lib/octopus/proxy.rb, line 120
def transaction(options = {}, &block)
  if !sharded && current_model_replicated?
    run_queries_on_shard(Octopus.master_shard) do
      select_connection.transaction(options, &block)
    end
  else
    select_connection.transaction(options, &block)
  end
end
update(arel, name = nil, binds = []) click to toggle source
# File lib/octopus/proxy.rb, line 48
def update(arel, name = nil, binds = [])
  conn = select_connection
  # Call the legacy should_clean_connection_proxy? method here, emulating an insert.
  clean_connection_proxy if should_clean_connection_proxy?('insert')
  conn.update(arel, name, binds)
end

Protected Instance Methods

keeping_connection_proxy(shard) { || ... } click to toggle source

Temporarily block cleaning connection proxy and run the block

@see Octopus::Proxy#should_clean_connection? @see Octopus::Proxy#clean_connection_proxy

# File lib/octopus/proxy.rb, line 324
def keeping_connection_proxy(shard, &_block)
  last_block = block

  begin
    self.block = shard
    yield
  ensure
    self.block = last_block || nil
  end
end
legacy_method_missing_logic(method, *args, &block) click to toggle source

@thiagopradi - This legacy method missing logic will be keep for a while for compatibility and will be removed when Octopus 1.0 will be released. We are planning to migrate to a much stable logic for the Proxy that doesn't require method missing.

# File lib/octopus/proxy.rb, line 213
def legacy_method_missing_logic(method, *args, &block)
  if should_clean_connection_proxy?(method)
    conn = select_connection
    clean_connection_proxy
    conn.send(method, *args, &block)
  elsif should_send_queries_to_shard_slave_group?(method)
    send_queries_to_shard_slave_group(method, *args, &block)
  elsif should_send_queries_to_slave_group?(method)
    send_queries_to_slave_group(method, *args, &block)
  elsif should_send_queries_to_replicated_databases?(method)
    send_queries_to_selected_slave(method, *args, &block)
  else
    val = select_connection.send(method, *args, &block)

    if val.instance_of? ActiveRecord::Result
      val.current_shard = shard_name
    end

    val
  end
end
send_queries_to_balancer(balancer, method, *args, &block) click to toggle source

Temporarily switch `current_shard` to the next slave in a slave group and send queries to it while preserving `current_shard`

# File lib/octopus/proxy.rb, line 304
def send_queries_to_balancer(balancer, method, *args, &block)
  send_queries_to_slave(balancer.next(current_load_balance_options), method, *args, &block)
end
send_queries_to_selected_slave(method, *args, &block) click to toggle source
# File lib/octopus/proxy.rb, line 275
def send_queries_to_selected_slave(method, *args, &block)
  if current_model.replicated || fully_replicated?
    selected_slave = slaves_load_balancer.next current_load_balance_options
  else
    selected_slave = Octopus.master_shard
  end

  send_queries_to_slave(selected_slave, method, *args, &block)
end
send_queries_to_slave(slave, method, *args, &block) click to toggle source

Temporarily switch `current_shard` to the specified slave and send queries to it while preserving `current_shard`

# File lib/octopus/proxy.rb, line 310
def send_queries_to_slave(slave, method, *args, &block)
  using_shard(slave) do
    val = select_connection.send(method, *args, &block)
    if val.instance_of? ActiveRecord::Result
      val.current_shard = slave
    end
    val
  end
end
should_clean_connection_proxy?(method) click to toggle source
# File lib/octopus/proxy.rb, line 266
def should_clean_connection_proxy?(method)
  method.to_s =~ /insert|select|execute/ && !current_model_replicated? && (!block || block != current_shard)
end
should_send_queries_to_replicated_databases?(method) click to toggle source

Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined

# File lib/octopus/proxy.rb, line 271
def should_send_queries_to_replicated_databases?(method)
  replicated && method.to_s =~ /select/ && !block && !slaves_grouped?
end
should_use_slaves_for_method?(method) click to toggle source

We should use slaves if and only if its safe to do so.

We can safely use slaves when: (1) `replicated: true` is specified in `shards.yml` (2) The current model is `replicated()`, or `fully_replicated: true` is specified in `shards.yml` which means that

all the model is `replicated()`

(3) It's a SELECT query while ensuring that we revert `current_shard` from the selected slave to the (shard's) master not to make queries other than SELECT leak to the slave.

# File lib/octopus/proxy.rb, line 294
def should_use_slaves_for_method?(method)
  current_model_replicated? && method.to_s =~ /select/
end
slaves_grouped?() click to toggle source
# File lib/octopus/proxy.rb, line 298
def slaves_grouped?
  slave_groups.present?
end
using_group(group) { || ... } click to toggle source

Temporarily switch `current_group` and run the block

# File lib/octopus/proxy.rb, line 354
def using_group(group, &_block)
  older_group = current_group

  begin
    self.current_group = group
    yield
  ensure
    self.current_group = older_group
  end
end
using_shard(shard) { || ... } click to toggle source

Temporarily switch `current_shard` and run the block

# File lib/octopus/proxy.rb, line 336
def using_shard(shard, &_block)
  older_shard = current_shard
  older_slave_group = current_slave_group
  older_load_balance_options = current_load_balance_options

  begin
    unless current_model && !current_model.allowed_shard?(shard)
      self.current_shard = shard
    end
    yield
  ensure
    self.current_shard = older_shard
    self.current_slave_group = older_slave_group
    self.current_load_balance_options = older_load_balance_options
  end
end
with_each_healthy_shard() { |v| ... } click to toggle source

Ensure that a single failing slave doesn't take down the entire application

# File lib/octopus/proxy.rb, line 236
def with_each_healthy_shard
  shards.each do |shard_name, v|
    begin
      yield(v)
    rescue => e
      if Octopus.robust_environment?
        Octopus.logger.error "Error on shard #{shard_name}: #{e.message}"
      else
        raise
      end
    end
  end

  ar_pools = ActiveRecord::Base.connection_handler.connection_pool_list

  ar_pools.each do |pool|
    next if pool == shards[:master] # Already handled this

    begin
      yield(pool)
    rescue => e
      if Octopus.robust_environment?
        Octopus.logger.error "Error on pool (spec: #{pool.spec}): #{e.message}"
      else
        raise
      end
    end
  end
end