class Pwwka::Receiver

Attributes

channel[R]
channel_connector[R]
queue_name[R]
routing_key[R]
topic_exchange[R]

Public Class Methods

new(queue_name, routing_key, prefetch: Pwwka.configuration.default_prefetch) click to toggle source
# File lib/pwwka/receiver.rb, line 12
def initialize(queue_name, routing_key, prefetch: Pwwka.configuration.default_prefetch)
  @queue_name        = queue_name
  @routing_key       = routing_key
  @channel_connector = ChannelConnector.new(prefetch: prefetch, connection_name: "c: #{Pwwka.configuration.app_id} #{Pwwka.configuration.process_name}".strip)
  @channel           = @channel_connector.channel
  @topic_exchange    = @channel_connector.topic_exchange
end
subscribe(handler_klass, queue_name, routing_key: " click to toggle source
# File lib/pwwka/receiver.rb, line 20
def self.subscribe(handler_klass, queue_name,
                   routing_key: "#.#",
                   block: true,
                   prefetch: Pwwka.configuration.default_prefetch,
                   payload_parser: Pwwka.configuration.payload_parser)
  raise "#{handler_klass.name} must respond to `handle!`" unless handler_klass.respond_to?(:handle!)
  receiver  = new(queue_name, routing_key, prefetch: prefetch)
  begin
    info "Receiving on #{queue_name}"
    receiver.topic_queue.subscribe(manual_ack: true, block: block) do |delivery_info, properties, payload|
      begin
        payload = payload_parser.(payload)
        handler_klass.handle!(delivery_info, properties, payload)
        receiver.ack(delivery_info.delivery_tag)
        logf "Processed Message on %{queue_name} -> %{payload}, %{routing_key}", queue_name: queue_name, payload: payload, routing_key: delivery_info.routing_key
      rescue => exception
        Pwwka::ErrorHandlers::Chain.new(
          Pwwka.configuration.error_handling_chain
        ).handle_error(
          handler_klass,
          receiver,
          queue_name,
          payload,
          delivery_info,
          exception)
      end
    end
  rescue Interrupt => _
    # TODO: trap TERM within channel.work_pool
    info "Interrupting queue #{queue_name} subscriber safely"
  ensure
    receiver.channel_connector.connection_close
  end
  return receiver
end

Public Instance Methods

ack(delivery_tag) click to toggle source
# File lib/pwwka/receiver.rb, line 64
def ack(delivery_tag)
  channel.acknowledge(delivery_tag, false)
end
drop_queue() click to toggle source
# File lib/pwwka/receiver.rb, line 76
def drop_queue
  topic_queue.purge
  topic_queue.delete
end
nack(delivery_tag) click to toggle source
# File lib/pwwka/receiver.rb, line 68
def nack(delivery_tag)
  channel.nack(delivery_tag, false, false)
end
nack_requeue(delivery_tag) click to toggle source
# File lib/pwwka/receiver.rb, line 72
def nack_requeue(delivery_tag)
  channel.nack(delivery_tag, false, true)
end
test_teardown() click to toggle source
# File lib/pwwka/receiver.rb, line 81
def test_teardown
  drop_queue
  topic_exchange.delete
  channel_connector.connection_close
end
topic_queue() click to toggle source
# File lib/pwwka/receiver.rb, line 56
def topic_queue
  @topic_queue ||= begin
    queue = channel.queue(queue_name, durable: true, arguments: {})
    routing_key.split(',').each { |k| queue.bind(topic_exchange, routing_key: k) }
    queue
  end
end