class QC::ConnAdapter

Attributes

connection[RW]

Public Class Methods

new(c=nil) click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 8
def initialize(c=nil)
  @connection = c.nil? ? establish_new : validate!(c)
  @mutex = Mutex.new
end

Public Instance Methods

disconnect() click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 41
def disconnect
  @mutex.synchronize do
    begin
      @connection.close
    rescue => e
      QC.log(:at => 'disconnect', :error => e.message)
    end
  end
end
execute(stmt, *params) click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 13
def execute(stmt, *params)
  @mutex.synchronize do
    QC.log(:at => "exec_sql", :sql => stmt.inspect)
    begin
      params = nil if params.empty?
      r = @connection.exec(stmt, params)
      result = []
      r.each {|t| result << t}
      result.length > 1 ? result : result.pop
    rescue PG::Error => e
      QC.log(:error => e.inspect)
      @connection.reset
      raise
    end
  end
end
server_version() click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 51
def server_version
  @server_version ||= begin
                        version = execute("SHOW server_version_num;")["server_version_num"]
                        version && version.to_i
                      end
end
wait(time, *channels) click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 30
def wait(time, *channels)
  @mutex.synchronize do
    listen_cmds = channels.map {|c| 'LISTEN "' + c.to_s + '"'}
    @connection.exec(listen_cmds.join(';'))
    wait_for_notify(time)
    unlisten_cmds = channels.map {|c| 'UNLISTEN "' + c.to_s + '"'}
    @connection.exec(unlisten_cmds.join(';'))
    drain_notify
  end
end

Private Instance Methods

db_url() click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 102
def db_url
  return @db_url if defined?(@db_url) && @db_url
  url = ENV["QC_DATABASE_URL"] ||
        ENV["DATABASE_URL"]    ||
        raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL")
  @db_url = URI.parse(url)
end
drain_notify() click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 66
def drain_notify
  until @connection.notifies.nil?
    QC.log(:at => "drain_notifications")
  end
end
establish_new() click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 78
def establish_new
  QC.log(:at => "establish_conn")
  conn = PG.connect(*normalize_db_url(db_url))
  if conn.status != PG::CONNECTION_OK
    QC.log(:error => conn.error)
  end
  conn.exec("SET application_name = '#{QC.app_name}'")
  conn
end
normalize_db_url(url) click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 88
def normalize_db_url(url)
  host = url.host
  host = host.gsub(/%2F/i, '/') if host

  [
   host, # host or percent-encoded socket path
   url.port || 5432,
   nil, '', #opts, tty
   url.path.gsub("/",""), # database name
   url.user,
   url.password
  ]
end
validate!(c) click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 72
def validate!(c)
  return c if c.is_a?(PG::Connection)
  err = "connection must be an instance of PG::Connection, but was #{c.class}"
  raise(ArgumentError, err)
end
wait_for_notify(t) click to toggle source
# File lib/queue_classic/conn_adapter.rb, line 60
def wait_for_notify(t)
  Array.new.tap do |msgs|
    @connection.wait_for_notify(t) {|event, pid, msg| msgs << msg}
  end
end