class Emque::Consuming::Adapters::RabbitMq::ErrorWorker
Attributes
channel[RW]
connection[RW]
Public Class Methods
new(connection)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/error_worker.rb, line 10 def initialize(connection) self.connection = connection self.channel = connection.create_channel end
Public Instance Methods
retry_errors()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/error_worker.rb, line 15 def retry_errors logger.info "#{log_prefix} starting" channel.open if channel.closed? [error_queue.message_count, 100].min.times do delivery_info, properties, payload = error_queue.pop( {:manual_ack => true} ) if delivery_info && properties && payload retry_message(delivery_info, properties, payload) end end channel.close logger.info "#{log_prefix} done" end
Private Instance Methods
error_queue()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/error_worker.rb, line 34 def error_queue channel.queue( "emque.#{config.app_name}.error", :durable => true, :auto_delete => false, :arguments => { "x-dead-letter-exchange" => "#{config.app_name}.error" } ) end
log_prefix()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/error_worker.rb, line 45 def log_prefix "RabbitMQ ErrorWorker:" end
retry_message(delivery_info, metadata, payload)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/error_worker.rb, line 49 def retry_message(delivery_info, metadata, payload) begin logger.info "#{log_prefix} processing message #{metadata}" logger.debug "#{log_prefix} payload #{payload}" message = Emque::Consuming::Message.new( :original => payload ) ::Emque::Consuming::Consumer.new.consume(:process, message) channel.ack(delivery_info.delivery_tag) rescue StandardError => exception channel.nack(delivery_info.delivery_tag) end end