class Emque::Consuming::Adapters::RabbitMq::ErrorWorker

Attributes

channel[RW]
connection[RW]

Public Class Methods

new(connection) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/error_worker.rb, line 10
def initialize(connection)
  self.connection = connection
  self.channel = connection.create_channel
end

Public Instance Methods

retry_errors() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/error_worker.rb, line 15
def retry_errors
  logger.info "#{log_prefix} starting"
  channel.open if channel.closed?
  [error_queue.message_count, 100].min.times do
    delivery_info, properties, payload = error_queue.pop(
      {:manual_ack => true}
    )
    if delivery_info && properties && payload
      retry_message(delivery_info, properties, payload)
    end
  end
  channel.close
  logger.info "#{log_prefix} done"
end

Private Instance Methods

error_queue() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/error_worker.rb, line 34
def error_queue
  channel.queue(
    "emque.#{config.app_name}.error",
    :durable => true,
    :auto_delete => false,
    :arguments => {
      "x-dead-letter-exchange" => "#{config.app_name}.error"
    }
  )
end
log_prefix() click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/error_worker.rb, line 45
def log_prefix
  "RabbitMQ ErrorWorker:"
end
retry_message(delivery_info, metadata, payload) click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/error_worker.rb, line 49
def retry_message(delivery_info, metadata, payload)
  begin
    logger.info "#{log_prefix} processing message #{metadata}"
    logger.debug "#{log_prefix} payload #{payload}"
    message = Emque::Consuming::Message.new(
      :original => payload
    )
    ::Emque::Consuming::Consumer.new.consume(:process, message)
    channel.ack(delivery_info.delivery_tag)
  rescue StandardError => exception
    channel.nack(delivery_info.delivery_tag)
  end
end