class Hutch::Worker
Attributes
setup_procs[RW]
Public Class Methods
new(broker, consumers, setup_procs)
click to toggle source
# File lib/hutch/worker.rb, line 13 def initialize(broker, consumers, setup_procs) @broker = broker self.consumers = consumers self.setup_procs = setup_procs end
Public Instance Methods
acknowledge_error(delivery_info, properties, broker, ex)
click to toggle source
# File lib/hutch/worker.rb, line 90 def acknowledge_error(delivery_info, properties, broker, ex) acks = error_acknowledgements + [Hutch::Acknowledgements::NackOnAllFailures.new] acks.find do |backend| backend.handle(delivery_info, properties, broker, ex) end end
consumers=(val)
click to toggle source
# File lib/hutch/worker.rb, line 98 def consumers=(val) if val.empty? logger.warn "no consumer loaded, ensure there's no configuration issue" end @consumers = val end
error_acknowledgements()
click to toggle source
# File lib/hutch/worker.rb, line 105 def error_acknowledgements Hutch::Config[:error_acknowledgements] end
handle_error(*args)
click to toggle source
# File lib/hutch/worker.rb, line 84 def handle_error(*args) Hutch::Config[:error_handlers].each do |backend| backend.handle(*args) end end
handle_message(consumer, delivery_info, properties, payload)
click to toggle source
Called internally when a new messages comes in from RabbitMQ. Responsible for wrapping up the message and passing it to the consumer.
# File lib/hutch/worker.rb, line 61 def handle_message(consumer, delivery_info, properties, payload) serializer = consumer.get_serializer || Hutch::Config[:serializer] logger.debug { spec = serializer.binary? ? "#{payload.bytesize} bytes" : "#{payload}" "message(#{properties.message_id || '-'}): " + "routing key: #{delivery_info.routing_key}, " + "consumer: #{consumer}, " + "payload: #{spec}" } message = Message.new(delivery_info, properties, payload, serializer) consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info } with_tracing(consumer_instance).handle(message) @broker.ack(delivery_info.delivery_tag) rescue => ex acknowledge_error(delivery_info, properties, @broker, ex) handle_error(properties, payload, consumer, ex) end
run()
click to toggle source
Run the main event loop. The consumers will be set up with queues, and process the messages in their respective queues indefinitely. This method never returns.
# File lib/hutch/worker.rb, line 22 def run setup_queues setup_procs.each(&:call) Waiter.wait_until_signaled stop end
setup_queue(consumer)
click to toggle source
Bind a consumer's routing keys to its queue, and set up a subscription to receive messages sent to the queue.
# File lib/hutch/worker.rb, line 47 def setup_queue(consumer) logger.info "setting up queue: #{consumer.get_queue_name}" queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments) @broker.bind_queue(queue, consumer.routing_keys) queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args| delivery_info, properties, payload = Hutch::Adapter.decode_message(*args) handle_message(consumer, delivery_info, properties, payload) end end
setup_queues()
click to toggle source
Set up the queues for each of the worker's consumers.
# File lib/hutch/worker.rb, line 37 def setup_queues logger.info 'setting up queues' vetted = @consumers.reject { |c| group_configured? && group_restricted?(c) } vetted.each do |c| setup_queue(c) end end
stop()
click to toggle source
Stop a running worker by killing all subscriber threads.
# File lib/hutch/worker.rb, line 32 def stop @broker.stop end
with_tracing(klass)
click to toggle source
# File lib/hutch/worker.rb, line 80 def with_tracing(klass) Hutch::Config[:tracer].new(klass) end
Private Instance Methods
consumer_groups()
click to toggle source
# File lib/hutch/worker.rb, line 131 def consumer_groups Hutch::Config[:consumer_groups] end
group()
click to toggle source
# File lib/hutch/worker.rb, line 127 def group Hutch::Config[:group] end
group_configured?()
click to toggle source
# File lib/hutch/worker.rb, line 111 def group_configured? if group.present? && consumer_groups.blank? logger.info 'Consumer groups are blank' end group.present? end
group_restricted?(consumer)
click to toggle source
# File lib/hutch/worker.rb, line 118 def group_restricted?(consumer) consumers_to_load = consumer_groups[group] if consumers_to_load !consumers_to_load.include?(consumer.name) else true end end
unique_consumer_tag()
click to toggle source
# File lib/hutch/worker.rb, line 137 def unique_consumer_tag prefix = Hutch::Config[:consumer_tag_prefix] unique_part = SecureRandom.uuid "#{prefix}-#{unique_part}".tap do |tag| raise "Tag must be 255 bytes long at most, current one is #{tag.bytesize} ('#{tag}')" if tag.bytesize > 255 end end