class Emque::Consuming::Adapters::RabbitMq::Manager

Attributes

delayed_message_workers[W]
workers[W]

Public Instance Methods

actor_died(actor, reason) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 11
def actor_died(actor, reason)
  unless shutdown
    logger.error "RabbitMQ Manager: actor_died - #{actor.inspect} " +
                 "died: #{reason}"
  end
end
delayed_message_workers() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 70
def delayed_message_workers
  @delayed_message_workers
end
retry_errors() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 74
def retry_errors
  ErrorWorker.new(@connection).retry_errors
end
start() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 18
def start
  setup_connection
  initialize_error_queue
  initialize_workers
  initialize_delayed_message_workers if enable_delayed_message
  logger.info "RabbitMQ Manager: starting #{worker_count} workers..."
  workers(:flatten => true).each do |worker|
    worker.async.start
  end
  if enable_delayed_message
    delayed_message_workers.each do |worker|
      worker.async.start
    end
  end
end
stop() click to toggle source
Calls superclass method Emque::Consuming::Actor#stop
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 34
def stop
  logger.info "RabbitMQ Manager: stopping #{worker_count} workers..."

  super do
    workers(:flatten => true).each do |worker|
      logger.info "RabbitMQ Manager: stopping #{worker.topic} worker..."
      worker.stop
    end
    if enable_delayed_message
      delayed_message_workers.each_with_index do |worker, i|
        logger.info "RabbitMQ Manager: stopping #{worker.class} #{i + 1} worker..."
        worker.stop
      end
    end
  end

  @connection.stop
end
worker(topic:, command:) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 53
def worker(topic:, command:)
  if workers.has_key?(topic)
    case command
    when :down
      worker = workers[topic].pop
      worker.stop if worker
    when :up
      workers[topic] << new_worker(topic)
      workers[topic].last.async.start
    end
  end
end
workers(flatten: false) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 66
def workers(flatten: false)
  flatten ? @workers.values.flatten : @workers
end

Private Instance Methods

enable_delayed_message() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 101
def enable_delayed_message
  config.enable_delayed_message
end
initialize_delayed_message_workers() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 93
def initialize_delayed_message_workers
  self.delayed_message_workers = [].tap { |workers|
    config.delayed_message_workers.times do
      workers << DelayedMessageWorker.new_link(@connection)
    end
  }
end
initialize_error_queue() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 105
def initialize_error_queue
  channel = @connection.create_channel
  error_exchange = channel.fanout(
    "#{config.app_name}.error",
    :durable => true,
    :auto_delete => false
  )
  channel.queue(
    "emque.#{config.app_name}.error",
    :durable => true,
    :auto_delete => false,
    :arguments => {
      "x-dead-letter-exchange" => "#{config.app_name}.error"
    }
  ).bind(error_exchange)
  channel.close
end
initialize_workers() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 82
def initialize_workers
  self.workers = {}.tap { |workers|
    router.topic_mapping.keys.each do |topic|
      workers[topic] ||= []
      router.workers(topic).times do
        workers[topic] << new_worker(topic)
      end
    end
  }
end
new_worker(topic) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 123
def new_worker(topic)
  Worker.new_link(@connection, topic)
end
setup_connection() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 127
def setup_connection
  @connection = Bunny.new(config.adapter.options[:url])
  @connection.start
end
worker_count() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/manager.rb, line 132
def worker_count
  workers(:flatten => true).size
end