class Emque::Consuming::Runner

Attributes

instance[RW]
control[RW]
options[RW]
persist[RW]
pid[RW]
pidfile[RW]
receivers[RW]
status[RW]

Public Class Methods

new(options = {}) click to toggle source
# File lib/emque/consuming/runner.rb, line 20
def initialize(options = {})
  self.control = Emque::Consuming::Control.new
  self.options = options
  self.receivers = []
  self.status = Emque::Consuming::Status.new
  apply_options
  Emque::Consuming
    .application
    .initialize_logger(daemonized: options.fetch(:daemon) { false })
  self.class.instance = self
  self.pidfile = options.fetch(:pidfile, default_pidfile)
  self.pid = Emque::Consuming::Pidfile.new(pidfile)
end

Public Instance Methods

app() click to toggle source
Calls superclass method Emque::Consuming::Helpers#app
# File lib/emque/consuming/runner.rb, line 34
def app
  super
end
console() click to toggle source
# File lib/emque/consuming/runner.rb, line 38
def console
  require "pry"
  Pry.start
end
http?() click to toggle source
# File lib/emque/consuming/runner.rb, line 43
def http?
  config.status == :on
end
phased_restart() click to toggle source
# File lib/emque/consuming/runner.rb, line 47
def phased_restart
  receivers.each { |r| r.stop && r.start }
end
restart() click to toggle source
# File lib/emque/consuming/runner.rb, line 51
def restart
  stop && start
end
restart_application() click to toggle source
# File lib/emque/consuming/runner.rb, line 55
def restart_application
  receivers.first.restart
end
sock?() click to toggle source
# File lib/emque/consuming/runner.rb, line 59
def sock?
  true
end
start() click to toggle source
# File lib/emque/consuming/runner.rb, line 63
def start
  exit_if_already_running!
  daemonize! if daemonize?
  write_pidfile!
  @persist = Thread.new { loop { sleep 1 } }
  set_process_title
  setup_receivers
  receivers.each(&:start)
  persist.join
rescue Interrupt
  stop
end
stop(timeout: 5) click to toggle source
# File lib/emque/consuming/runner.rb, line 76
def stop(timeout: 5)
  if persist
    Thread.new do
      sleep timeout
      logger.error("Timeout Exceeded. Forcing Shutdown.")
      persist.exit if persist.alive?
    end
    receivers.each(&:stop)
    logger.info("Graceful shutdown successful.")
    logger.info("#{config.app_name.capitalize} stopped.")
    persist.exit if persist.alive?
  else
    Emque::Consuming::Transmitter.send(
      :command => :stop,
      :socket_path => config.socket_path
    )
  end
end

Private Instance Methods

apply_options() click to toggle source
# File lib/emque/consuming/runner.rb, line 100
def apply_options
  options.each do |attr, val|
    config.send("#{attr}=", val) if config.respond_to?(attr)
  end
end
config() click to toggle source
# File lib/emque/consuming/runner.rb, line 106
def config
  Emque::Consuming.application.config
end
daemonize!() click to toggle source
# File lib/emque/consuming/runner.rb, line 114
def daemonize!
  Process.daemon(true, true)

  [$stdout, $stderr].each do |io|
    File.open(Emque::Consuming.application.logfile, "ab") do |f|
      io.reopen(f)
    end
    io.sync = true
  end

  $stdin.reopen("/dev/null")
end
daemonize?() click to toggle source
# File lib/emque/consuming/runner.rb, line 110
def daemonize?
  options[:daemon]
end
default_pidfile() click to toggle source
# File lib/emque/consuming/runner.rb, line 127
def default_pidfile
  File.join(
    Emque::Consuming.application.root,
    "tmp",
    "pids",
    "#{config.app_name}.pid"
  )
end
exit_if_already_running!() click to toggle source
# File lib/emque/consuming/runner.rb, line 136
def exit_if_already_running!
  if pid.running?
    [
      "Pid file exists. Process #{pid} active.",
      "Please ensure app is not running."
    ].each do |msg|
      logger.error(msg)
      $stdout.puts(msg)
    end

    exit
  end
end
set_process_title() click to toggle source
# File lib/emque/consuming/runner.rb, line 150
def set_process_title
  title = "#{config.app_name} [pidfile: #{pidfile}"
  title << " | unix socket: #{config.socket_path}" if sock?
  title << " | http://#{config.status_host}:#{config.status_port}" if http?
  title << "]"
  $0 = title
end
setup_receivers() click to toggle source
# File lib/emque/consuming/runner.rb, line 158
def setup_receivers
  receivers << app
  receivers << Emque::Consuming::CommandReceivers::UnixSocket.new if sock?
  receivers << Emque::Consuming::CommandReceivers::HttpServer.new if http?
end
write_pidfile!() click to toggle source
# File lib/emque/consuming/runner.rb, line 164
def write_pidfile!
  pid.write
  at_exit { FileUtils.rm_f(pidfile) }
end